From 96228c1fd042481583d31be65c1a656ff98d800d Mon Sep 17 00:00:00 2001 From: akwizgran Date: Wed, 22 Dec 2021 12:58:26 +0000 Subject: [PATCH 1/4] Do all of SimplexOutgoingSession's work on the IoExecutor. --- .../bramble/sync/SimplexOutgoingSession.java | 166 ++++++------------ .../bramble/sync/SyncSessionFactoryImpl.java | 4 +- .../sync/SimplexOutgoingSessionTest.java | 21 +-- 3 files changed, 62 insertions(+), 129 deletions(-) diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java index 90c6be0b4..3e6075614 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java @@ -3,7 +3,6 @@ package org.briarproject.bramble.sync; import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.contact.event.ContactRemovedEvent; import org.briarproject.bramble.api.db.DatabaseComponent; -import org.briarproject.bramble.api.db.DatabaseExecutor; import org.briarproject.bramble.api.db.DbException; import org.briarproject.bramble.api.event.Event; import org.briarproject.bramble.api.event.EventBus; @@ -28,10 +27,6 @@ import java.util.Collection; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; import javax.annotation.concurrent.ThreadSafe; @@ -57,11 +52,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener { private static final Logger LOG = getLogger(SimplexOutgoingSession.class.getName()); - private static final ThrowingRunnable CLOSE = () -> { - }; - private final DatabaseComponent db; - private final Executor dbExecutor; private final EventBus eventBus; private final ContactId contactId; private final TransportId transportId; @@ -69,17 +60,18 @@ class SimplexOutgoingSession implements SyncSession, EventListener { private final boolean eager; private final StreamWriter streamWriter; private final SyncRecordWriter recordWriter; - private final AtomicInteger outstandingQueries; - private final BlockingQueue> writerTasks; private volatile boolean interrupted = false; - SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor, - EventBus eventBus, ContactId contactId, TransportId transportId, - long maxLatency, boolean eager, StreamWriter streamWriter, + SimplexOutgoingSession(DatabaseComponent db, + EventBus eventBus, + ContactId contactId, + TransportId transportId, + long maxLatency, + boolean eager, + StreamWriter streamWriter, SyncRecordWriter recordWriter) { this.db = db; - this.dbExecutor = dbExecutor; this.eventBus = eventBus; this.contactId = contactId; this.transportId = transportId; @@ -87,8 +79,6 @@ class SimplexOutgoingSession implements SyncSession, EventListener { this.eager = eager; this.streamWriter = streamWriter; this.recordWriter = recordWriter; - outstandingQueries = new AtomicInteger(2); // One per type of record - writerTasks = new LinkedBlockingQueue<>(); } @IoExecutor @@ -98,22 +88,22 @@ class SimplexOutgoingSession implements SyncSession, EventListener { try { // Send our supported protocol versions recordWriter.writeVersions(new Versions(SUPPORTED_VERSIONS)); - // Start a query for each type of record - dbExecutor.execute(this::generateAck); - if (eager) dbExecutor.execute(this::loadUnackedMessageIds); - else dbExecutor.execute(this::generateBatch); - // Write records until interrupted or no more records to write try { - while (!interrupted) { - ThrowingRunnable task = writerTasks.take(); - if (task == CLOSE) break; - task.run(); + // Send any waiting acks + while (!interrupted) if (!generateAndWriteAck()) break; + // Send any waiting messages + if (eager) { + Map ids = loadUnackedMessageIds(); + while (!interrupted && !ids.isEmpty()) { + generateAndWriteEagerBatch(ids); + } + } else { + while (!interrupted) if (!generateAndWriteBatch()) break; } - streamWriter.sendEndOfStream(); - } catch (InterruptedException e) { - LOG.info("Interrupted while waiting for a record to write"); - Thread.currentThread().interrupt(); + } catch (DbException e) { + logException(LOG, WARNING, e); } + streamWriter.sendEndOfStream(); } finally { eventBus.removeListener(this); } @@ -122,11 +112,6 @@ class SimplexOutgoingSession implements SyncSession, EventListener { @Override public void interrupt() { interrupted = true; - writerTasks.add(CLOSE); - } - - private void decrementOutstandingQueries() { - if (outstandingQueries.decrementAndGet() == 0) writerTasks.add(CLOSE); } @Override @@ -146,26 +131,17 @@ class SimplexOutgoingSession implements SyncSession, EventListener { } } - @DatabaseExecutor - private void loadUnackedMessageIds() { - if (interrupted) return; - try { - Map ids = db.transactionWithResult(true, txn -> - db.getUnackedMessagesToSend(txn, contactId)); - if (LOG.isLoggable(INFO)) { - LOG.info(ids.size() + " unacked messages to send"); - } - if (ids.isEmpty()) decrementOutstandingQueries(); - else dbExecutor.execute(() -> generateEagerBatch(ids)); - } catch (DbException e) { - logException(LOG, WARNING, e); - interrupt(); + private Map loadUnackedMessageIds() throws DbException { + Map ids = db.transactionWithResult(true, txn -> + db.getUnackedMessagesToSend(txn, contactId)); + if (LOG.isLoggable(INFO)) { + LOG.info(ids.size() + " unacked messages to send"); } + return ids; } - @DatabaseExecutor - private void generateEagerBatch(Map ids) { - if (interrupted) return; + private void generateAndWriteEagerBatch(Map ids) + throws DbException, IOException { // Take some message IDs from `ids` to form a batch Collection batchIds = new ArrayList<>(); long totalLength = 0; @@ -181,75 +157,35 @@ class SimplexOutgoingSession implements SyncSession, EventListener { totalLength += length; } if (batchIds.isEmpty()) throw new AssertionError(); - try { - Collection batch = - db.transactionWithResult(false, txn -> - db.generateBatch(txn, contactId, batchIds, - maxLatency)); - writerTasks.add(() -> writeEagerBatch(batch, ids)); - } catch (DbException e) { - logException(LOG, WARNING, e); - interrupt(); + Collection b = db.transactionWithResult(false, txn -> + db.generateBatch(txn, contactId, batchIds, maxLatency)); + // The batch may be empty if some of the messages are no longer shared + if (!b.isEmpty()) { + for (Message m : b) recordWriter.writeMessage(m); + LOG.info("Sent eager batch"); } } - @IoExecutor - private void writeEagerBatch(Collection batch, - Map ids) throws IOException { - if (interrupted) return; - for (Message m : batch) recordWriter.writeMessage(m); - LOG.info("Sent eager batch"); - if (ids.isEmpty()) decrementOutstandingQueries(); - else dbExecutor.execute(() -> generateEagerBatch(ids)); - } - - @DatabaseExecutor - private void generateAck() { - if (interrupted) return; - try { - Ack a = db.transactionWithNullableResult(false, txn -> - db.generateAck(txn, contactId, MAX_MESSAGE_IDS)); - if (LOG.isLoggable(INFO)) - LOG.info("Generated ack: " + (a != null)); - if (a == null) decrementOutstandingQueries(); - else writerTasks.add(() -> writeAck(a)); - } catch (DbException e) { - logException(LOG, WARNING, e); - interrupt(); - } - } - - @IoExecutor - private void writeAck(Ack ack) throws IOException { - if (interrupted) return; - recordWriter.writeAck(ack); + private boolean generateAndWriteAck() throws DbException, IOException { + Ack a = db.transactionWithNullableResult(false, txn -> + db.generateAck(txn, contactId, MAX_MESSAGE_IDS)); + if (LOG.isLoggable(INFO)) + LOG.info("Generated ack: " + (a != null)); + if (a == null) return false; + recordWriter.writeAck(a); LOG.info("Sent ack"); - dbExecutor.execute(this::generateAck); + return true; } - @DatabaseExecutor - private void generateBatch() { - if (interrupted) return; - try { - Collection b = - db.transactionWithNullableResult(false, txn -> - db.generateBatch(txn, contactId, - MAX_RECORD_PAYLOAD_BYTES, maxLatency)); - if (LOG.isLoggable(INFO)) - LOG.info("Generated batch: " + (b != null)); - if (b == null) decrementOutstandingQueries(); - else writerTasks.add(() -> writeBatch(b)); - } catch (DbException e) { - logException(LOG, WARNING, e); - interrupt(); - } - } - - @IoExecutor - private void writeBatch(Collection batch) throws IOException { - if (interrupted) return; - for (Message m : batch) recordWriter.writeMessage(m); + private boolean generateAndWriteBatch() throws DbException, IOException { + Collection b = db.transactionWithNullableResult(false, txn -> + db.generateBatch(txn, contactId, + MAX_RECORD_PAYLOAD_BYTES, maxLatency)); + if (LOG.isLoggable(INFO)) + LOG.info("Generated batch: " + (b != null)); + if (b == null) return false; + for (Message m : b) recordWriter.writeMessage(m); LOG.info("Sent batch"); - dbExecutor.execute(this::generateBatch); + return true; } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java index 70fd0df8b..38aa6daa7 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java @@ -64,8 +64,8 @@ class SyncSessionFactoryImpl implements SyncSessionFactory { OutputStream out = streamWriter.getOutputStream(); SyncRecordWriter recordWriter = recordWriterFactory.createRecordWriter(out); - return new SimplexOutgoingSession(db, dbExecutor, eventBus, c, t, - maxLatency, eager, streamWriter, recordWriter); + return new SimplexOutgoingSession(db, eventBus, c, t, maxLatency, + eager, streamWriter, recordWriter); } @Override diff --git a/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java b/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java index 9bcf7cc3f..bd4bc35f0 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java @@ -14,12 +14,10 @@ import org.briarproject.bramble.api.sync.Versions; import org.briarproject.bramble.api.transport.StreamWriter; import org.briarproject.bramble.test.BrambleMockTestCase; import org.briarproject.bramble.test.DbExpectations; -import org.briarproject.bramble.test.ImmediateExecutor; import org.junit.Test; import java.util.LinkedHashMap; import java.util.Map; -import java.util.concurrent.Executor; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; @@ -41,7 +39,6 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { private final SyncRecordWriter recordWriter = context.mock(SyncRecordWriter.class); - private final Executor dbExecutor = new ImmediateExecutor(); private final ContactId contactId = getContactId(); private final TransportId transportId = getTransportId(); private final Ack ack = @@ -54,7 +51,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { @Test public void testNothingToSend() throws Exception { SimplexOutgoingSession session = new SimplexOutgoingSession(db, - dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, + eventBus, contactId, transportId, MAX_LATENCY, false, streamWriter, recordWriter); Transaction noAckTxn = new Transaction(null, false); @@ -88,7 +85,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { @Test public void testNothingToSendEagerly() throws Exception { SimplexOutgoingSession session = new SimplexOutgoingSession(db, - dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, + eventBus, contactId, transportId, MAX_LATENCY, true, streamWriter, recordWriter); Transaction noAckTxn = new Transaction(null, false); @@ -121,7 +118,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { @Test public void testSomethingToSend() throws Exception { SimplexOutgoingSession session = new SimplexOutgoingSession(db, - dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, + eventBus, contactId, transportId, MAX_LATENCY, false, streamWriter, recordWriter); Transaction ackTxn = new Transaction(null, false); @@ -140,6 +137,11 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { oneOf(db).generateAck(ackTxn, contactId, MAX_MESSAGE_IDS); will(returnValue(ack)); oneOf(recordWriter).writeAck(ack); + // No more acks + oneOf(db).transactionWithNullableResult(with(false), + withNullableDbCallable(noAckTxn)); + oneOf(db).generateAck(noAckTxn, contactId, MAX_MESSAGE_IDS); + will(returnValue(null)); // One message to send oneOf(db).transactionWithNullableResult(with(false), withNullableDbCallable(msgTxn)); @@ -147,11 +149,6 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { MAX_RECORD_PAYLOAD_BYTES, MAX_LATENCY); will(returnValue(singletonList(message))); oneOf(recordWriter).writeMessage(message); - // No more acks - oneOf(db).transactionWithNullableResult(with(false), - withNullableDbCallable(noAckTxn)); - oneOf(db).generateAck(noAckTxn, contactId, MAX_MESSAGE_IDS); - will(returnValue(null)); // No more messages oneOf(db).transactionWithNullableResult(with(false), withNullableDbCallable(noMsgTxn)); @@ -170,7 +167,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { @Test public void testSomethingToSendEagerly() throws Exception { SimplexOutgoingSession session = new SimplexOutgoingSession(db, - dbExecutor, eventBus, contactId, transportId, MAX_LATENCY, + eventBus, contactId, transportId, MAX_LATENCY, true, streamWriter, recordWriter); Map unacked = new LinkedHashMap<>(); From 8c33ea5a6ba48f26edad61ce9198e1a793e9e1cc Mon Sep 17 00:00:00 2001 From: akwizgran Date: Fri, 24 Dec 2021 12:00:15 +0000 Subject: [PATCH 2/4] Add javadocs for database. --- .../bramble/api/db/DatabaseComponent.java | 7 ++ .../bramble/api/db/DatabaseExecutor.java | 4 ++ .../bramble/api/db/Transaction.java | 6 ++ .../bramble/api/db/TransactionManager.java | 68 +++++++++++++++---- 4 files changed, 73 insertions(+), 12 deletions(-) diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java index ea507351b..975c4ba15 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java @@ -33,11 +33,18 @@ import java.util.List; import java.util.Map; import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; /** * Encapsulates the database implementation and exposes high-level operations * to other components. + *

+ * With the exception of the {@link #open(SecretKey, MigrationListener)} and + * {@link #close()} methods, which must not be called concurrently, the + * database can be accessed from any thread. See {@link TransactionManager} + * for locking behaviour. */ +@ThreadSafe @NotNullByDefault public interface DatabaseComponent extends TransactionManager { diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseExecutor.java b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseExecutor.java index 256be229a..b1fae6e05 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseExecutor.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseExecutor.java @@ -18,6 +18,10 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME; * submitted, tasks are not run concurrently, and submitting a task will never * block. Tasks must not run indefinitely. Tasks submitted during shutdown are * discarded. + *

+ * It is not mandatory to use this executor for database tasks. The database + * can be accessed from any thread, but this executor's guarantee that tasks + * are run in the order they're submitted may be useful in some cases. */ @Qualifier @Target({FIELD, METHOD, PARAMETER}) diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/db/Transaction.java b/bramble-api/src/main/java/org/briarproject/bramble/api/db/Transaction.java index e54da0064..cdb6e3980 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/db/Transaction.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/db/Transaction.java @@ -45,6 +45,9 @@ public class Transaction { /** * Attaches an event to be broadcast when the transaction has been * committed. The event will be broadcast on the {@link EventExecutor}. + * Events and {@link #attach(Runnable) tasks} are submitted to the + * {@link EventExecutor} in the order they were attached to the + * transaction. */ public void attach(Event e) { if (actions == null) actions = new ArrayList<>(); @@ -54,6 +57,9 @@ public class Transaction { /** * Attaches a task to be executed when the transaction has been * committed. The task will be run on the {@link EventExecutor}. + * {@link #attach(Event) Events} and tasks are submitted to the + * {@link EventExecutor} in the order they were attached to the + * transaction. */ public void attach(Runnable r) { if (actions == null) actions = new ArrayList<>(); diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/db/TransactionManager.java b/bramble-api/src/main/java/org/briarproject/bramble/api/db/TransactionManager.java index 6850c5b99..2ed25caf3 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/db/TransactionManager.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/db/TransactionManager.java @@ -1,51 +1,95 @@ package org.briarproject.bramble.api.db; +import org.briarproject.bramble.api.event.EventExecutor; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; +/** + * An interface for managing database transactions. + *

+ * Read-only transactions may access the database concurrently. Read-write + * transactions access the database exclusively, so starting a read-only or + * read-write transaction will block until there are no read-write + * transactions in progress. + *

+ * Failing to {@link #endTransaction(Transaction) end} a transaction will + * prevent other callers from accessing the database, so it is recommended to + * use the {@link #transaction(boolean, DbRunnable)}, + * {@link #transactionWithResult(boolean, DbCallable)} and + * {@link #transactionWithNullableResult(boolean, NullableDbCallable)} methods + * where possible, which handle committing or aborting the transaction on the + * caller's behalf. + *

+ * Transactions are not reentrant, i.e. it is not permitted to start a + * transaction on a thread that already has a transaction in progress. + */ +@ThreadSafe @NotNullByDefault public interface TransactionManager { /** - * Starts a new transaction and returns an object representing it. - *

- * This method acquires locks, so it must not be called while holding a - * lock. + * Starts a new transaction and returns an object representing it. This + * method acquires the database lock, which is held until + * {@link #endTransaction(Transaction)} is called. * - * @param readOnly true if the transaction will only be used for reading. + * @param readOnly True if the transaction will only be used for reading, + * in which case the database lock can be shared with other read-only + * transactions. */ Transaction startTransaction(boolean readOnly) throws DbException; /** * Commits a transaction to the database. + * {@link #endTransaction(Transaction)} must be called to release the + * database lock. */ void commitTransaction(Transaction txn) throws DbException; /** - * Ends a transaction. If the transaction has not been committed, - * it will be aborted. If the transaction has been committed, - * any events attached to the transaction are broadcast. - * The database lock will be released in either case. + * Ends a transaction. If the transaction has not been committed by + * calling {@link #commitTransaction(Transaction)}, it is aborted and the + * database lock is released. + *

+ * If the transaction has been committed, any + * {@link Transaction#attach events} attached to the transaction are + * broadcast and any {@link Transaction#attach(Runnable) tasks} attached + * to the transaction are submitted to the {@link EventExecutor}. The + * database lock is then released. */ void endTransaction(Transaction txn); /** - * Runs the given task within a transaction. + * Runs the given task within a transaction. The database lock is held + * while running the task. + * + * @param readOnly True if the transaction will only be used for reading, + * in which case the database lock can be shared with other read-only + * transactions. */ void transaction(boolean readOnly, DbRunnable task) throws DbException, E; /** * Runs the given task within a transaction and returns the result of the - * task. + * task. The database lock is held while running the task. + * + * @param readOnly True if the transaction will only be used for reading, + * in which case the database lock can be shared with other read-only + * transactions. */ R transactionWithResult(boolean readOnly, DbCallable task) throws DbException, E; /** * Runs the given task within a transaction and returns the result of the - * task, which may be null. + * task, which may be null. The database lock is held while running the + * task. + * + * @param readOnly True if the transaction will only be used for reading, + * in which case the database lock can be shared with other read-only + * transactions. */ @Nullable R transactionWithNullableResult(boolean readOnly, From 0691354952e0f03eb9b0897e26a3dea92acadc12 Mon Sep 17 00:00:00 2001 From: akwizgran Date: Thu, 17 Mar 2022 14:49:56 +0000 Subject: [PATCH 3/4] Defer marking messages and acks as sent. --- .../bramble/api/db/DatabaseComponent.java | 97 ++++++-- .../bramble/api/mailbox/MailboxConstants.java | 23 ++ .../bramble/api/record/RecordWriter.java | 2 + .../bramble/api/sync/DeferredSendHandler.java | 15 ++ .../bramble/api/sync/SyncRecordWriter.java | 2 + .../org/briarproject/bramble/db/Database.java | 22 +- .../bramble/db/DatabaseComponentImpl.java | 121 ++++++--- .../briarproject/bramble/db/JdbcDatabase.java | 59 +++-- .../bramble/record/RecordWriterImpl.java | 8 + .../bramble/sync/DuplexOutgoingSession.java | 18 +- .../sync/EagerSimplexOutgoingSession.java | 66 +++++ .../bramble/sync/MailboxOutgoingSession.java | 117 +++++++++ .../bramble/sync/SimplexOutgoingSession.java | 104 +++----- .../bramble/sync/SyncRecordWriterImpl.java | 5 + .../bramble/sync/SyncSessionFactoryImpl.java | 9 +- .../bramble/db/DatabaseComponentImplTest.java | 196 ++++++++++++++- .../bramble/db/DatabasePerformanceTest.java | 9 +- .../bramble/db/JdbcDatabaseTest.java | 24 +- .../sync/EagerSimplexOutgoingSessionTest.java | 134 ++++++++++ .../sync/MailboxOutgoingSessionTest.java | 231 ++++++++++++++++++ .../sync/SimplexOutgoingSessionTest.java | 110 +-------- 21 files changed, 1085 insertions(+), 287 deletions(-) create mode 100644 bramble-api/src/main/java/org/briarproject/bramble/api/mailbox/MailboxConstants.java create mode 100644 bramble-api/src/main/java/org/briarproject/bramble/api/sync/DeferredSendHandler.java create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/sync/EagerSimplexOutgoingSession.java create mode 100644 bramble-core/src/main/java/org/briarproject/bramble/sync/MailboxOutgoingSession.java create mode 100644 bramble-core/src/test/java/org/briarproject/bramble/sync/EagerSimplexOutgoingSessionTest.java create mode 100644 bramble-core/src/test/java/org/briarproject/bramble/sync/MailboxOutgoingSessionTest.java diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java index 975c4ba15..34a2928f3 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java @@ -200,26 +200,15 @@ public interface DatabaseComponent extends TransactionManager { throws DbException; /** - * Returns a batch of messages for the given contact, with a total length - * less than or equal to the given length, for transmission over a - * transport with the given maximum latency. Returns null if there are no - * sendable messages that fit in the given length. + * Returns a batch of messages for the given contact, for transmission over + * a transport with the given maximum latency. The total length of the + * messages, including record headers, will be no more than the given + * capacity. Returns null if there are no sendable messages that would fit + * in the given capacity. */ @Nullable Collection generateBatch(Transaction txn, ContactId c, - int maxLength, long maxLatency) throws DbException; - - /** - * Returns a batch of messages for the given contact containing the - * messages with the given IDs, for transmission over a transport with - * the given maximum latency. - *

- * If any of the given messages are not in the database or are not visible - * to the contact, they are omitted from the batch without throwing an - * exception. - */ - Collection generateBatch(Transaction txn, ContactId c, - Collection ids, long maxLatency) throws DbException; + int capacity, long maxLatency) throws DbException; /** * Returns an offer for the given contact for transmission over a @@ -239,15 +228,16 @@ public interface DatabaseComponent extends TransactionManager { throws DbException; /** - * Returns a batch of messages for the given contact, with a total length - * less than or equal to the given length, for transmission over a - * transport with the given maximum latency. Only messages that have been - * requested by the contact are returned. Returns null if there are no - * sendable messages that fit in the given length. + * Returns a batch of messages for the given contact, for transmission over + * a transport with the given maximum latency. Only messages that have been + * requested by the contact are returned. The total length of the messages, + * including record headers, will be no more than the given capacity. + * Returns null if there are no sendable messages that have been requested + * by the contact and would fit in the given capacity. */ @Nullable Collection generateRequestedBatch(Transaction txn, ContactId c, - int maxLength, long maxLatency) throws DbException; + int capacity, long maxLatency) throws DbException; /** * Returns the contact with the given ID. @@ -351,6 +341,30 @@ public interface DatabaseComponent extends TransactionManager { Collection getMessageIds(Transaction txn, GroupId g, Metadata query) throws DbException; + /** + * Returns the IDs of some messages received from the given contact that + * need to be acknowledged, up to the given number of messages. + *

+ * Read-only. + */ + Collection getMessagesToAck(Transaction txn, ContactId c, + int maxMessages) throws DbException; + + /** + * Returns the IDs of some messages that are eligible to be sent to the + * given contact over a transport with the given maximum latency. The total + * length of the messages including record headers will be no more than the + * given capacity. + *

+ * Unlike {@link #getUnackedMessagesToSend(Transaction, ContactId)} this + * method does not return messages that have already been sent unless they + * are due for retransmission. + *

+ * Read-only. + */ + Collection getMessagesToSend(Transaction txn, ContactId c, + int capacity, long maxLatency) throws DbException; + /** * Returns the IDs of any messages that need to be validated. *

@@ -467,15 +481,30 @@ public interface DatabaseComponent extends TransactionManager { MessageStatus getMessageStatus(Transaction txn, ContactId c, MessageId m) throws DbException; + /** + * Returns the message with the given ID for transmission to the given + * contact over a transport with the given maximum latency. Returns null + * if the message is no longer visible to the contact. + * + * @param markAsSent True if the message should be marked as sent. + * If false it can be marked as sent by calling + * {@link #setMessagesSent(Transaction, ContactId, Collection, long)}. + */ + @Nullable + Message getMessageToSend(Transaction txn, ContactId c, MessageId m, + long maxLatency, boolean markAsSent) throws DbException; + /** * Returns the IDs of all messages that are eligible to be sent to the - * given contact, together with their raw lengths. This may include - * messages that have already been sent and are not yet due for - * retransmission. + * given contact. + *

+ * Unlike {@link #getMessagesToSend(Transaction, ContactId, int, long)} + * this method may return messages that have already been sent and are + * not yet due for retransmission. *

* Read-only. */ - Map getUnackedMessagesToSend(Transaction txn, + Collection getUnackedMessagesToSend(Transaction txn, ContactId c) throws DbException; /** @@ -655,6 +684,13 @@ public interface DatabaseComponent extends TransactionManager { void removeTransportKeys(Transaction txn, TransportId t, KeySetId k) throws DbException; + /** + * Records an ack for the given messages as having been sent to the given + * contact. + */ + void setAckSent(Transaction txn, ContactId c, Collection acked) + throws DbException; + /** * Sets the cleanup timer duration for the given message. This does not * start the message's cleanup timer. @@ -701,6 +737,13 @@ public interface DatabaseComponent extends TransactionManager { void setMessageState(Transaction txn, MessageId m, MessageState state) throws DbException; + /** + * Records the given messages as having been sent to the given contact + * over a transport with the given maximum latency. + */ + void setMessagesSent(Transaction txn, ContactId c, + Collection sent, long maxLatency) throws DbException; + /** * Adds dependencies for a message */ diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/mailbox/MailboxConstants.java b/bramble-api/src/main/java/org/briarproject/bramble/api/mailbox/MailboxConstants.java new file mode 100644 index 000000000..976e86c90 --- /dev/null +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/mailbox/MailboxConstants.java @@ -0,0 +1,23 @@ +package org.briarproject.bramble.api.mailbox; + +import static org.briarproject.bramble.api.transport.TransportConstants.MAX_FRAME_LENGTH; +import static org.briarproject.bramble.api.transport.TransportConstants.MAX_PAYLOAD_LENGTH; +import static org.briarproject.bramble.api.transport.TransportConstants.STREAM_HEADER_LENGTH; +import static org.briarproject.bramble.api.transport.TransportConstants.TAG_LENGTH; + +public interface MailboxConstants { + + /** + * The maximum length of a file that can be uploaded to or downloaded from + * a mailbox. + */ + int MAX_FILE_BYTES = 1024 * 1024; + + /** + * The maximum length of the plaintext payload of a file, such that the + * ciphertext is no more than {@link #MAX_FILE_BYTES}. + */ + int MAX_FILE_PAYLOAD_BYTES = + (MAX_FILE_BYTES - TAG_LENGTH - STREAM_HEADER_LENGTH) + / MAX_FRAME_LENGTH * MAX_PAYLOAD_LENGTH; +} diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/record/RecordWriter.java b/bramble-api/src/main/java/org/briarproject/bramble/api/record/RecordWriter.java index eb83d4d41..893e5ff53 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/record/RecordWriter.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/record/RecordWriter.java @@ -12,4 +12,6 @@ public interface RecordWriter { void flush() throws IOException; void close() throws IOException; + + long getBytesWritten(); } diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/DeferredSendHandler.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/DeferredSendHandler.java new file mode 100644 index 000000000..1966b3bb6 --- /dev/null +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/DeferredSendHandler.java @@ -0,0 +1,15 @@ +package org.briarproject.bramble.api.sync; + +import java.util.Collection; + +/** + * An interface for holding the IDs of messages sent and acked during an + * outgoing {@link SyncSession} so they can be recorded in the DB as sent + * or acked at some later time. + */ +public interface DeferredSendHandler { + + void onAckSent(Collection acked); + + void onMessageSent(MessageId sent); +} diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncRecordWriter.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncRecordWriter.java index 75d4b8401..4234f50fb 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncRecordWriter.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncRecordWriter.java @@ -20,4 +20,6 @@ public interface SyncRecordWriter { void writePriority(Priority p) throws IOException; void flush() throws IOException; + + long getBytesWritten(); } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java b/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java index 921580049..1d0c6bedc 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java @@ -406,6 +406,12 @@ interface Database { Collection getMessageIds(T txn, GroupId g, Metadata query) throws DbException; + /** + * Returns the length of the given message in bytes, including the + * message header. + */ + int getMessageLength(T txn, MessageId m) throws DbException; + /** * Returns the metadata for all delivered messages in the given group. *

@@ -496,7 +502,8 @@ interface Database { /** * Returns the IDs of some messages that are eligible to be sent to the - * given contact, up to the given total length. + * given contact. The total length of the messages including record headers + * will be no more than the given capacity. *

* Unlike {@link #getUnackedMessagesToSend(Object, ContactId)} this method * does not return messages that have already been sent unless they are @@ -504,12 +511,12 @@ interface Database { *

* Read-only. */ - Collection getMessagesToSend(T txn, ContactId c, int maxLength, + Collection getMessagesToSend(T txn, ContactId c, int capacity, long maxLatency) throws DbException; /** * Returns the IDs of all messages that are eligible to be sent to the - * given contact, together with their raw lengths. + * given contact. *

* Unlike {@link #getMessagesToSend(Object, ContactId, int, long)} this * method may return messages that have already been sent and are not yet @@ -517,7 +524,7 @@ interface Database { *

* Read-only. */ - Map getUnackedMessagesToSend(T txn, ContactId c) + Collection getUnackedMessagesToSend(T txn, ContactId c) throws DbException; /** @@ -598,13 +605,14 @@ interface Database { /** * Returns the IDs of some messages that are eligible to be sent to the - * given contact and have been requested by the contact, up to the given - * total length. + * given contact and have been requested by the contact. The total length + * of the messages including record headers will be no more than the given + * capacity. *

* Read-only. */ Collection getRequestedMessagesToSend(T txn, ContactId c, - int maxLength, long maxLatency) throws DbException; + int capacity, long maxLatency) throws DbException; /** * Returns all settings in the given namespace. diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java index 7e4eded83..895e4a0ce 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java @@ -75,7 +75,6 @@ import org.briarproject.bramble.api.transport.TransportKeys; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; @@ -87,6 +86,7 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import javax.inject.Inject; +import static java.util.Collections.singletonList; import static java.util.logging.Level.WARNING; import static java.util.logging.Logger.getLogger; import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE; @@ -424,13 +424,14 @@ class DatabaseComponentImpl implements DatabaseComponent { @Nullable @Override public Collection generateBatch(Transaction transaction, - ContactId c, int maxLength, long maxLatency) throws DbException { + ContactId c, int capacity, long maxLatency) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); T txn = unbox(transaction); if (!db.containsContact(txn, c)) throw new NoSuchContactException(); Collection ids = - db.getMessagesToSend(txn, c, maxLength, maxLatency); + db.getMessagesToSend(txn, c, capacity, maxLatency); + if (ids.isEmpty()) return null; long totalLength = 0; List messages = new ArrayList<>(ids.size()); for (MessageId m : ids) { @@ -439,38 +440,11 @@ class DatabaseComponentImpl implements DatabaseComponent { messages.add(message); db.updateRetransmissionData(txn, c, m, maxLatency); } - if (ids.isEmpty()) return null; db.lowerRequestedFlag(txn, c, ids); transaction.attach(new MessagesSentEvent(c, ids, totalLength)); return messages; } - @Override - public Collection generateBatch(Transaction transaction, - ContactId c, Collection ids, long maxLatency) - throws DbException { - if (transaction.isReadOnly()) throw new IllegalArgumentException(); - T txn = unbox(transaction); - if (!db.containsContact(txn, c)) - throw new NoSuchContactException(); - long totalLength = 0; - List messages = new ArrayList<>(ids.size()); - List sentIds = new ArrayList<>(ids.size()); - for (MessageId m : ids) { - if (db.containsVisibleMessage(txn, c, m)) { - Message message = db.getMessage(txn, m); - totalLength += message.getRawLength(); - messages.add(message); - sentIds.add(m); - db.updateRetransmissionData(txn, c, m, maxLatency); - } - } - if (messages.isEmpty()) return messages; - db.lowerRequestedFlag(txn, c, sentIds); - transaction.attach(new MessagesSentEvent(c, sentIds, totalLength)); - return messages; - } - @Nullable @Override public Offer generateOffer(Transaction transaction, ContactId c, @@ -505,13 +479,14 @@ class DatabaseComponentImpl implements DatabaseComponent { @Nullable @Override public Collection generateRequestedBatch(Transaction transaction, - ContactId c, int maxLength, long maxLatency) throws DbException { + ContactId c, int capacity, long maxLatency) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); T txn = unbox(transaction); if (!db.containsContact(txn, c)) throw new NoSuchContactException(); Collection ids = - db.getRequestedMessagesToSend(txn, c, maxLength, maxLatency); + db.getRequestedMessagesToSend(txn, c, capacity, maxLatency); + if (ids.isEmpty()) return null; long totalLength = 0; List messages = new ArrayList<>(ids.size()); for (MessageId m : ids) { @@ -520,7 +495,6 @@ class DatabaseComponentImpl implements DatabaseComponent { messages.add(message); db.updateRetransmissionData(txn, c, m, maxLatency); } - if (ids.isEmpty()) return null; db.lowerRequestedFlag(txn, c, ids); transaction.attach(new MessagesSentEvent(c, ids, totalLength)); return messages; @@ -635,6 +609,24 @@ class DatabaseComponentImpl implements DatabaseComponent { return db.getMessageIds(txn, g, query); } + @Override + public Collection getMessagesToAck(Transaction transaction, + ContactId c, int maxMessages) throws DbException { + T txn = unbox(transaction); + if (!db.containsContact(txn, c)) + throw new NoSuchContactException(); + return db.getMessagesToAck(txn, c, maxMessages); + } + + @Override + public Collection getMessagesToSend(Transaction transaction, + ContactId c, int capacity, long maxLatency) throws DbException { + T txn = unbox(transaction); + if (!db.containsContact(txn, c)) + throw new NoSuchContactException(); + return db.getMessagesToSend(txn, c, capacity, maxLatency); + } + @Override public Collection getMessagesToValidate(Transaction transaction) throws DbException { @@ -740,10 +732,29 @@ class DatabaseComponentImpl implements DatabaseComponent { return status; } + @Nullable @Override - public Map getUnackedMessagesToSend( - Transaction transaction, - ContactId c) throws DbException { + public Message getMessageToSend(Transaction transaction, ContactId c, + MessageId m, long maxLatency, boolean markAsSent) + throws DbException { + if (transaction.isReadOnly()) throw new IllegalArgumentException(); + T txn = unbox(transaction); + if (!db.containsContact(txn, c)) + throw new NoSuchContactException(); + if (!db.containsVisibleMessage(txn, c, m)) return null; + Message message = db.getMessage(txn, m); + if (markAsSent) { + db.updateRetransmissionData(txn, c, m, maxLatency); + db.lowerRequestedFlag(txn, c, singletonList(m)); + transaction.attach(new MessagesSentEvent(c, singletonList(m), + message.getRawLength())); + } + return message; + } + + @Override + public Collection getUnackedMessagesToSend( + Transaction transaction, ContactId c) throws DbException { T txn = unbox(transaction); if (!db.containsContact(txn, c)) throw new NoSuchContactException(); @@ -1069,6 +1080,20 @@ class DatabaseComponentImpl implements DatabaseComponent { db.removeTransportKeys(txn, t, k); } + @Override + public void setAckSent(Transaction transaction, ContactId c, + Collection acked) throws DbException { + if (transaction.isReadOnly()) throw new IllegalArgumentException(); + T txn = unbox(transaction); + if (!db.containsContact(txn, c)) + throw new NoSuchContactException(); + List visible = new ArrayList<>(acked.size()); + for (MessageId m : acked) { + if (db.containsVisibleMessage(txn, c, m)) visible.add(m); + } + db.lowerAckFlag(txn, c, visible); + } + @Override public void setCleanupTimerDuration(Transaction transaction, MessageId m, long duration) throws DbException { @@ -1115,7 +1140,7 @@ class DatabaseComponentImpl implements DatabaseComponent { if (old == INVISIBLE) db.addGroupVisibility(txn, c, g, v == SHARED); else if (v == INVISIBLE) db.removeGroupVisibility(txn, c, g); else db.setGroupVisibility(txn, c, g, v == SHARED); - List affected = Collections.singletonList(c); + List affected = singletonList(c); transaction.attach(new GroupVisibilityUpdatedEvent(affected)); } @@ -1163,6 +1188,28 @@ class DatabaseComponentImpl implements DatabaseComponent { transaction.attach(new MessageStateChangedEvent(m, false, state)); } + @Override + public void setMessagesSent(Transaction transaction, ContactId c, + Collection sent, long maxLatency) throws DbException { + if (transaction.isReadOnly()) throw new IllegalArgumentException(); + T txn = unbox(transaction); + if (!db.containsContact(txn, c)) + throw new NoSuchContactException(); + long totalLength = 0; + List visible = new ArrayList<>(sent.size()); + for (MessageId m : sent) { + if (db.containsVisibleMessage(txn, c, m)) { + visible.add(m); + totalLength += db.getMessageLength(txn, m); + db.updateRetransmissionData(txn, c, m, maxLatency); + } + } + db.lowerRequestedFlag(txn, c, visible); + if (!visible.isEmpty()) { + transaction.attach(new MessagesSentEvent(c, visible, totalLength)); + } + } + @Override public void addMessageDependencies(Transaction transaction, Message dependent, Collection dependencies) diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java b/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java index 3ae8f7255..52d92eb9f 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java @@ -51,7 +51,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -76,6 +75,7 @@ import static java.util.logging.Logger.getLogger; import static org.briarproject.bramble.api.db.DatabaseComponent.NO_CLEANUP_DEADLINE; import static org.briarproject.bramble.api.db.DatabaseComponent.TIMER_NOT_STARTED; import static org.briarproject.bramble.api.db.Metadata.REMOVE; +import static org.briarproject.bramble.api.record.Record.RECORD_HEADER_BYTES; import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE; import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED; import static org.briarproject.bramble.api.sync.Group.Visibility.VISIBLE; @@ -1879,6 +1879,31 @@ abstract class JdbcDatabase implements Database { } } + @Override + public int getMessageLength(Connection txn, MessageId m) + throws DbException { + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = "SELECT length from messages" + + " WHERE messageId = ? AND state = ?"; + ps = txn.prepareStatement(sql); + ps.setBytes(1, m.getBytes()); + ps.setInt(2, DELIVERED.getValue()); + rs = ps.executeQuery(); + if (!rs.next()) throw new DbStateException(); + int length = rs.getInt(1); + if (rs.next()) throw new DbStateException(); + rs.close(); + ps.close(); + return length; + } catch (SQLException e) { + tryToClose(rs, LOG, WARNING); + tryToClose(ps, LOG, WARNING); + throw new DbException(e); + } + } + @Override public Map getMessageMetadata(Connection txn, GroupId g) throws DbException { @@ -2227,8 +2252,8 @@ abstract class JdbcDatabase implements Database { } @Override - public Collection getMessagesToSend(Connection txn, ContactId c, - int maxLength, long maxLatency) throws DbException { + public Collection getMessagesToSend(Connection txn, + ContactId c, int capacity, long maxLatency) throws DbException { long now = clock.currentTimeMillis(); PreparedStatement ps = null; ResultSet rs = null; @@ -2248,12 +2273,11 @@ abstract class JdbcDatabase implements Database { ps.setLong(4, maxLatency); rs = ps.executeQuery(); List ids = new ArrayList<>(); - int total = 0; while (rs.next()) { int length = rs.getInt(1); - if (total + length > maxLength) break; + if (capacity < RECORD_HEADER_BYTES + length) break; ids.add(new MessageId(rs.getBytes(2))); - total += length; + capacity -= RECORD_HEADER_BYTES + length; } rs.close(); ps.close(); @@ -2266,12 +2290,12 @@ abstract class JdbcDatabase implements Database { } @Override - public Map getUnackedMessagesToSend(Connection txn, + public Collection getUnackedMessagesToSend(Connection txn, ContactId c) throws DbException { PreparedStatement ps = null; ResultSet rs = null; try { - String sql = "SELECT length, messageId FROM statuses" + String sql = "SELECT messageId FROM statuses" + " WHERE contactId = ? AND state = ?" + " AND groupShared = TRUE AND messageShared = TRUE" + " AND deleted = FALSE AND seen = FALSE" @@ -2280,15 +2304,11 @@ abstract class JdbcDatabase implements Database { ps.setInt(1, c.getInt()); ps.setInt(2, DELIVERED.getValue()); rs = ps.executeQuery(); - Map results = new LinkedHashMap<>(); - while (rs.next()) { - int length = rs.getInt(1); - MessageId id = new MessageId(rs.getBytes(2)); - results.put(id, length); - } + List ids = new ArrayList<>(); + while (rs.next()) ids.add(new MessageId(rs.getBytes(1))); rs.close(); ps.close(); - return results; + return ids; } catch (SQLException e) { tryToClose(rs, LOG, WARNING); tryToClose(ps, LOG, WARNING); @@ -2401,6 +2421,7 @@ abstract class JdbcDatabase implements Database { MessageId m = new MessageId(rs.getBytes(1)); GroupId g = new GroupId(rs.getBytes(2)); Collection messageIds = ids.get(g); + //noinspection Java8MapApi if (messageIds == null) { messageIds = new ArrayList<>(); ids.put(g, messageIds); @@ -2527,7 +2548,7 @@ abstract class JdbcDatabase implements Database { @Override public Collection getRequestedMessagesToSend(Connection txn, - ContactId c, int maxLength, long maxLatency) throws DbException { + ContactId c, int capacity, long maxLatency) throws DbException { long now = clock.currentTimeMillis(); PreparedStatement ps = null; ResultSet rs = null; @@ -2547,12 +2568,11 @@ abstract class JdbcDatabase implements Database { ps.setLong(4, maxLatency); rs = ps.executeQuery(); List ids = new ArrayList<>(); - int total = 0; while (rs.next()) { int length = rs.getInt(1); - if (total + length > maxLength) break; + if (capacity < RECORD_HEADER_BYTES + length) break; ids.add(new MessageId(rs.getBytes(2))); - total += length; + capacity -= RECORD_HEADER_BYTES + length; } rs.close(); ps.close(); @@ -2706,6 +2726,7 @@ abstract class JdbcDatabase implements Database { ContactId c = new ContactId(rs.getInt(1)); TransportId t = new TransportId(rs.getString(2)); Collection transportIds = ids.get(c); + //noinspection Java8MapApi if (transportIds == null) { transportIds = new ArrayList<>(); ids.put(c, transportIds); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/record/RecordWriterImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/record/RecordWriterImpl.java index ec9f5c0c0..137af59dc 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/record/RecordWriterImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/record/RecordWriterImpl.java @@ -19,6 +19,8 @@ class RecordWriterImpl implements RecordWriter { private final OutputStream out; private final byte[] header = new byte[RECORD_HEADER_BYTES]; + private long bytesWritten = 0; + RecordWriterImpl(OutputStream out) { this.out = out; } @@ -31,6 +33,7 @@ class RecordWriterImpl implements RecordWriter { ByteUtils.writeUint16(payload.length, header, 2); out.write(header); out.write(payload); + bytesWritten += RECORD_HEADER_BYTES + payload.length; } @Override @@ -42,4 +45,9 @@ class RecordWriterImpl implements RecordWriter { public void close() throws IOException { out.close(); } + + @Override + public long getBytesWritten() { + return bytesWritten; + } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java index 92e287203..349b8d37b 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java @@ -13,11 +13,13 @@ import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.plugin.event.TransportInactiveEvent; +import org.briarproject.bramble.api.record.Record; import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.Message; import org.briarproject.bramble.api.sync.Offer; import org.briarproject.bramble.api.sync.Priority; import org.briarproject.bramble.api.sync.Request; +import org.briarproject.bramble.api.sync.SyncConstants; import org.briarproject.bramble.api.sync.SyncRecordWriter; import org.briarproject.bramble.api.sync.SyncSession; import org.briarproject.bramble.api.sync.Versions; @@ -47,8 +49,9 @@ import static java.util.logging.Level.INFO; import static java.util.logging.Level.WARNING; import static java.util.logging.Logger.getLogger; import static org.briarproject.bramble.api.lifecycle.LifecycleManager.LifecycleState.STOPPING; -import static org.briarproject.bramble.api.record.Record.MAX_RECORD_PAYLOAD_BYTES; +import static org.briarproject.bramble.api.record.Record.RECORD_HEADER_BYTES; import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS; +import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_LENGTH; import static org.briarproject.bramble.api.sync.SyncConstants.SUPPORTED_VERSIONS; import static org.briarproject.bramble.util.LogUtils.logException; @@ -71,6 +74,16 @@ class DuplexOutgoingSession implements SyncSession, EventListener { NEXT_SEND_TIME_DECREASED = () -> { }; + /** + * The batch capacity must be at least {@link Record#RECORD_HEADER_BYTES} + * + {@link SyncConstants#MAX_MESSAGE_LENGTH} to ensure that maximum-size + * messages can be selected for transmission. Larger batches will mean + * fewer round-trips between the DB and the output stream, but each + * round-trip will block the DB for longer. + */ + private static final int BATCH_CAPACITY = + (RECORD_HEADER_BYTES + MAX_MESSAGE_LENGTH) * 2; + private final DatabaseComponent db; private final Executor dbExecutor; private final EventBus eventBus; @@ -296,8 +309,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener { db.transactionWithNullableResult(false, txn -> { Collection batch = db.generateRequestedBatch(txn, contactId, - MAX_RECORD_PAYLOAD_BYTES, - maxLatency); + BATCH_CAPACITY, maxLatency); setNextSendTime(db.getNextSendTime(txn, contactId)); return batch; }); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/EagerSimplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/EagerSimplexOutgoingSession.java new file mode 100644 index 000000000..1cc62de9f --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/EagerSimplexOutgoingSession.java @@ -0,0 +1,66 @@ +package org.briarproject.bramble.sync; + +import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.db.DatabaseComponent; +import org.briarproject.bramble.api.db.DbException; +import org.briarproject.bramble.api.event.EventBus; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.plugin.TransportId; +import org.briarproject.bramble.api.sync.Message; +import org.briarproject.bramble.api.sync.MessageId; +import org.briarproject.bramble.api.sync.SyncRecordWriter; +import org.briarproject.bramble.api.transport.StreamWriter; + +import java.io.IOException; +import java.util.Collection; +import java.util.logging.Logger; + +import javax.annotation.concurrent.ThreadSafe; + +import static java.util.logging.Level.INFO; +import static java.util.logging.Logger.getLogger; + +/** + * A {@link SimplexOutgoingSession} that sends messages eagerly, ie + * regardless of whether they're due for retransmission. + */ +@ThreadSafe +@NotNullByDefault +class EagerSimplexOutgoingSession extends SimplexOutgoingSession { + + private static final Logger LOG = + getLogger(EagerSimplexOutgoingSession.class.getName()); + + EagerSimplexOutgoingSession(DatabaseComponent db, + EventBus eventBus, + ContactId contactId, + TransportId transportId, + long maxLatency, + StreamWriter streamWriter, + SyncRecordWriter recordWriter) { + super(db, eventBus, contactId, transportId, maxLatency, streamWriter, + recordWriter); + } + + @Override + void sendMessages() throws DbException, IOException { + for (MessageId m : loadUnackedMessageIdsToSend()) { + if (isInterrupted()) break; + Message message = db.transactionWithNullableResult(false, txn -> + db.getMessageToSend(txn, contactId, m, maxLatency, true)); + if (message == null) continue; // No longer shared + recordWriter.writeMessage(message); + LOG.info("Sent message"); + } + } + + private Collection loadUnackedMessageIdsToSend() + throws DbException { + Collection ids = db.transactionWithResult(true, txn -> + db.getUnackedMessagesToSend(txn, contactId)); + if (LOG.isLoggable(INFO)) { + LOG.info(ids.size() + " unacked messages to send"); + } + return ids; + } +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/MailboxOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/MailboxOutgoingSession.java new file mode 100644 index 000000000..a8250475f --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/MailboxOutgoingSession.java @@ -0,0 +1,117 @@ +package org.briarproject.bramble.sync; + +import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.db.DatabaseComponent; +import org.briarproject.bramble.api.db.DbException; +import org.briarproject.bramble.api.event.EventBus; +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.plugin.TransportId; +import org.briarproject.bramble.api.sync.Ack; +import org.briarproject.bramble.api.sync.DeferredSendHandler; +import org.briarproject.bramble.api.sync.Message; +import org.briarproject.bramble.api.sync.MessageId; +import org.briarproject.bramble.api.sync.SyncRecordWriter; +import org.briarproject.bramble.api.transport.StreamWriter; + +import java.io.IOException; +import java.util.Collection; +import java.util.logging.Logger; + +import javax.annotation.concurrent.ThreadSafe; + +import static java.lang.Math.min; +import static java.util.Collections.emptyList; +import static java.util.logging.Level.INFO; +import static java.util.logging.Logger.getLogger; +import static org.briarproject.bramble.api.record.Record.RECORD_HEADER_BYTES; +import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS; +import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH; + +/** + * A {@link SimplexOutgoingSession} for sending and acking messages via a + * mailbox. The session uses a {@link DeferredSendHandler} to record the IDs + * of the messages sent and acked during the session so that they can be + * recorded in the DB as sent or acked after the file has been successfully + * uploaded to the mailbox. + */ +@ThreadSafe +@NotNullByDefault +class MailboxOutgoingSession extends SimplexOutgoingSession { + + private static final Logger LOG = + getLogger(MailboxOutgoingSession.class.getName()); + + private final DeferredSendHandler deferredSendHandler; + private final int initialCapacity; + + MailboxOutgoingSession(DatabaseComponent db, + EventBus eventBus, + ContactId contactId, + TransportId transportId, + long maxLatency, + StreamWriter streamWriter, + SyncRecordWriter recordWriter, + DeferredSendHandler deferredSendHandler, + int capacity) { + super(db, eventBus, contactId, transportId, maxLatency, streamWriter, + recordWriter); + this.deferredSendHandler = deferredSendHandler; + this.initialCapacity = capacity; + } + + @Override + void sendAcks() throws DbException, IOException { + while (!isInterrupted()) { + Collection idsToAck = loadMessageIdsToAck(); + if (idsToAck.isEmpty()) break; + recordWriter.writeAck(new Ack(idsToAck)); + deferredSendHandler.onAckSent(idsToAck); + LOG.info("Sent ack"); + } + } + + private Collection loadMessageIdsToAck() throws DbException { + int idCapacity = (getRemainingCapacity() - RECORD_HEADER_BYTES) + / MessageId.LENGTH; + if (idCapacity <= 0) return emptyList(); // Out of capacity + int maxMessageIds = min(idCapacity, MAX_MESSAGE_IDS); + Collection ids = db.transactionWithResult(true, txn -> + db.getMessagesToAck(txn, contactId, maxMessageIds)); + if (LOG.isLoggable(INFO)) { + LOG.info(ids.size() + " messages to ack"); + } + return ids; + } + + private int getRemainingCapacity() { + return initialCapacity - (int) recordWriter.getBytesWritten(); + } + + @Override + void sendMessages() throws DbException, IOException { + for (MessageId m : loadMessageIdsToSend()) { + if (isInterrupted()) break; + // Defer marking the message as sent + Message message = db.transactionWithNullableResult(true, txn -> + db.getMessageToSend(txn, contactId, m, maxLatency, false)); + if (message == null) continue; // No longer shared + recordWriter.writeMessage(message); + deferredSendHandler.onMessageSent(m); + LOG.info("Sent message"); + } + } + + private Collection loadMessageIdsToSend() throws DbException { + int capacity = getRemainingCapacity(); + if (capacity < RECORD_HEADER_BYTES + MESSAGE_HEADER_LENGTH) { + return emptyList(); // Out of capacity + } + Collection ids = db.transactionWithResult(true, txn -> + db.getMessagesToSend(txn, contactId, capacity, maxLatency)); + if (LOG.isLoggable(INFO)) { + LOG.info(ids.size() + " messages to send"); + } + return ids; + } + +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java index 3e6075614..a9efdcabe 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java @@ -12,9 +12,10 @@ import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.plugin.event.TransportInactiveEvent; +import org.briarproject.bramble.api.record.Record; import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.Message; -import org.briarproject.bramble.api.sync.MessageId; +import org.briarproject.bramble.api.sync.SyncConstants; import org.briarproject.bramble.api.sync.SyncRecordWriter; import org.briarproject.bramble.api.sync.SyncSession; import org.briarproject.bramble.api.sync.Versions; @@ -22,11 +23,7 @@ import org.briarproject.bramble.api.sync.event.CloseSyncConnectionsEvent; import org.briarproject.bramble.api.transport.StreamWriter; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; -import java.util.Iterator; -import java.util.Map; -import java.util.Map.Entry; import java.util.logging.Logger; import javax.annotation.concurrent.ThreadSafe; @@ -35,8 +32,9 @@ import static java.util.logging.Level.INFO; import static java.util.logging.Level.WARNING; import static java.util.logging.Logger.getLogger; import static org.briarproject.bramble.api.lifecycle.LifecycleManager.LifecycleState.STOPPING; -import static org.briarproject.bramble.api.record.Record.MAX_RECORD_PAYLOAD_BYTES; +import static org.briarproject.bramble.api.record.Record.RECORD_HEADER_BYTES; import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS; +import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_LENGTH; import static org.briarproject.bramble.api.sync.SyncConstants.SUPPORTED_VERSIONS; import static org.briarproject.bramble.util.LogUtils.logException; @@ -52,14 +50,23 @@ class SimplexOutgoingSession implements SyncSession, EventListener { private static final Logger LOG = getLogger(SimplexOutgoingSession.class.getName()); - private final DatabaseComponent db; - private final EventBus eventBus; - private final ContactId contactId; - private final TransportId transportId; - private final long maxLatency; - private final boolean eager; - private final StreamWriter streamWriter; - private final SyncRecordWriter recordWriter; + /** + * The batch capacity must be at least {@link Record#RECORD_HEADER_BYTES} + * + {@link SyncConstants#MAX_MESSAGE_LENGTH} to ensure that maximum-size + * messages can be selected for transmission. Larger batches will mean + * fewer round-trips between the DB and the output stream, but each + * round-trip will block the DB for longer. + */ + static final int BATCH_CAPACITY = + (RECORD_HEADER_BYTES + MAX_MESSAGE_LENGTH) * 2; + + protected final DatabaseComponent db; + protected final EventBus eventBus; + protected final ContactId contactId; + protected final TransportId transportId; + protected final long maxLatency; + protected final StreamWriter streamWriter; + protected final SyncRecordWriter recordWriter; private volatile boolean interrupted = false; @@ -68,7 +75,6 @@ class SimplexOutgoingSession implements SyncSession, EventListener { ContactId contactId, TransportId transportId, long maxLatency, - boolean eager, StreamWriter streamWriter, SyncRecordWriter recordWriter) { this.db = db; @@ -76,7 +82,6 @@ class SimplexOutgoingSession implements SyncSession, EventListener { this.contactId = contactId; this.transportId = transportId; this.maxLatency = maxLatency; - this.eager = eager; this.streamWriter = streamWriter; this.recordWriter = recordWriter; } @@ -89,17 +94,8 @@ class SimplexOutgoingSession implements SyncSession, EventListener { // Send our supported protocol versions recordWriter.writeVersions(new Versions(SUPPORTED_VERSIONS)); try { - // Send any waiting acks - while (!interrupted) if (!generateAndWriteAck()) break; - // Send any waiting messages - if (eager) { - Map ids = loadUnackedMessageIds(); - while (!interrupted && !ids.isEmpty()) { - generateAndWriteEagerBatch(ids); - } - } else { - while (!interrupted) if (!generateAndWriteBatch()) break; - } + sendAcks(); + sendMessages(); } catch (DbException e) { logException(LOG, WARNING, e); } @@ -114,6 +110,10 @@ class SimplexOutgoingSession implements SyncSession, EventListener { interrupted = true; } + boolean isInterrupted() { + return interrupted; + } + @Override public void eventOccurred(Event e) { if (e instanceof ContactRemovedEvent) { @@ -131,59 +131,31 @@ class SimplexOutgoingSession implements SyncSession, EventListener { } } - private Map loadUnackedMessageIds() throws DbException { - Map ids = db.transactionWithResult(true, txn -> - db.getUnackedMessagesToSend(txn, contactId)); - if (LOG.isLoggable(INFO)) { - LOG.info(ids.size() + " unacked messages to send"); - } - return ids; + void sendAcks() throws DbException, IOException { + while (!isInterrupted()) if (!generateAndSendAck()) break; } - private void generateAndWriteEagerBatch(Map ids) - throws DbException, IOException { - // Take some message IDs from `ids` to form a batch - Collection batchIds = new ArrayList<>(); - long totalLength = 0; - Iterator> it = ids.entrySet().iterator(); - while (it.hasNext()) { - // Check whether the next message will fit in the batch - Entry e = it.next(); - int length = e.getValue(); - if (totalLength + length > MAX_RECORD_PAYLOAD_BYTES) break; - // Add the message to the batch - it.remove(); - batchIds.add(e.getKey()); - totalLength += length; - } - if (batchIds.isEmpty()) throw new AssertionError(); - Collection b = db.transactionWithResult(false, txn -> - db.generateBatch(txn, contactId, batchIds, maxLatency)); - // The batch may be empty if some of the messages are no longer shared - if (!b.isEmpty()) { - for (Message m : b) recordWriter.writeMessage(m); - LOG.info("Sent eager batch"); - } - } - - private boolean generateAndWriteAck() throws DbException, IOException { + private boolean generateAndSendAck() throws DbException, IOException { Ack a = db.transactionWithNullableResult(false, txn -> db.generateAck(txn, contactId, MAX_MESSAGE_IDS)); if (LOG.isLoggable(INFO)) LOG.info("Generated ack: " + (a != null)); - if (a == null) return false; + if (a == null) return false; // No more acks to send recordWriter.writeAck(a); LOG.info("Sent ack"); return true; } - private boolean generateAndWriteBatch() throws DbException, IOException { + void sendMessages() throws DbException, IOException { + while (!isInterrupted()) if (!generateAndSendBatch()) break; + } + + private boolean generateAndSendBatch() throws DbException, IOException { Collection b = db.transactionWithNullableResult(false, txn -> - db.generateBatch(txn, contactId, - MAX_RECORD_PAYLOAD_BYTES, maxLatency)); + db.generateBatch(txn, contactId, BATCH_CAPACITY, maxLatency)); if (LOG.isLoggable(INFO)) LOG.info("Generated batch: " + (b != null)); - if (b == null) return false; + if (b == null) return false; // No more messages to send for (Message m : b) recordWriter.writeMessage(m); LOG.info("Sent batch"); return true; diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncRecordWriterImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncRecordWriterImpl.java index d2b4e73f1..38964b640 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncRecordWriterImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncRecordWriterImpl.java @@ -85,4 +85,9 @@ class SyncRecordWriterImpl implements SyncRecordWriter { public void flush() throws IOException { writer.flush(); } + + @Override + public long getBytesWritten() { + return writer.getBytesWritten(); + } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java index 38aa6daa7..fef6d7e45 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java @@ -64,8 +64,13 @@ class SyncSessionFactoryImpl implements SyncSessionFactory { OutputStream out = streamWriter.getOutputStream(); SyncRecordWriter recordWriter = recordWriterFactory.createRecordWriter(out); - return new SimplexOutgoingSession(db, eventBus, c, t, maxLatency, - eager, streamWriter, recordWriter); + if (eager) { + return new EagerSimplexOutgoingSession(db, eventBus, c, t, + maxLatency, streamWriter, recordWriter); + } else { + return new SimplexOutgoingSession(db, eventBus, c, t, + maxLatency, streamWriter, recordWriter); + } } @Override diff --git a/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java b/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java index 3af7a791a..e288e5507 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java @@ -72,6 +72,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; import static java.util.concurrent.TimeUnit.HOURS; import static org.briarproject.bramble.api.db.DatabaseComponent.TIMER_NOT_STARTED; +import static org.briarproject.bramble.api.record.Record.RECORD_HEADER_BYTES; import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE; import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED; import static org.briarproject.bramble.api.sync.Group.Visibility.VISIBLE; @@ -94,11 +95,15 @@ import static org.briarproject.bramble.test.TestUtils.getTransportId; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class DatabaseComponentImplTest extends BrambleMockTestCase { + private static final int BATCH_CAPACITY = + (RECORD_HEADER_BYTES + MAX_MESSAGE_LENGTH) * 2; + @SuppressWarnings("unchecked") private final Database database = context.mock(Database.class); private final ShutdownManager shutdownManager = @@ -298,11 +303,11 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { throws Exception { context.checking(new Expectations() {{ // Check whether the contact is in the DB (which it's not) - exactly(19).of(database).startTransaction(); + exactly(25).of(database).startTransaction(); will(returnValue(txn)); - exactly(19).of(database).containsContact(txn, contactId); + exactly(25).of(database).containsContact(txn, contactId); will(returnValue(false)); - exactly(19).of(database).abortTransaction(txn); + exactly(25).of(database).abortTransaction(txn); }}); DatabaseComponent db = createDatabaseComponent(database, eventBus, eventExecutor, shutdownManager); @@ -356,6 +361,39 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { // Expected } + try { + db.transaction(false, transaction -> + db.getMessageToSend(transaction, contactId, messageId, 123, + true)); + fail(); + } catch (NoSuchContactException expected) { + // Expected + } + + try { + db.transaction(true, transaction -> + db.getMessagesToAck(transaction, contactId, 123)); + fail(); + } catch (NoSuchContactException expected) { + // Expected + } + + try { + db.transaction(true, transaction -> + db.getMessagesToSend(transaction, contactId, 123, 456)); + fail(); + } catch (NoSuchContactException expected) { + // Expected + } + + try { + db.transaction(true, transaction -> + db.getUnackedMessagesToSend(transaction, contactId)); + fail(); + } catch (NoSuchContactException expected) { + // Expected + } + try { db.transaction(true, transaction -> db.getUnackedMessageBytesToSend(transaction, contactId)); @@ -439,6 +477,15 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { // Expected } + try { + db.transaction(false, transaction -> + db.setAckSent(transaction, contactId, + singletonList(messageId))); + fail(); + } catch (NoSuchContactException expected) { + // Expected + } + try { db.transaction(false, transaction -> db.setContactAlias(transaction, contactId, alias)); @@ -456,6 +503,15 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { // Expected } + try { + db.transaction(false, transaction -> + db.setMessagesSent(transaction, contactId, + singletonList(messageId), 123)); + fail(); + } catch (NoSuchContactException expected) { + // Expected + } + try { db.transaction(false, transaction -> db.setSyncVersions(transaction, contactId, emptyList())); @@ -918,12 +974,14 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { oneOf(database).containsContact(txn, contactId); will(returnValue(true)); oneOf(database).getMessagesToSend(txn, contactId, - MAX_MESSAGE_LENGTH * 2, maxLatency); + BATCH_CAPACITY, maxLatency); will(returnValue(ids)); + // First message oneOf(database).getMessage(txn, messageId); will(returnValue(message)); oneOf(database).updateRetransmissionData(txn, contactId, messageId, maxLatency); + // Second message oneOf(database).getMessage(txn, messageId1); will(returnValue(message1)); oneOf(database).updateRetransmissionData(txn, contactId, messageId1, @@ -937,7 +995,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { db.transaction(false, transaction -> assertEquals(messages, db.generateBatch(transaction, contactId, - MAX_MESSAGE_LENGTH * 2, maxLatency))); + BATCH_CAPACITY, maxLatency))); } @Test @@ -1001,12 +1059,14 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { oneOf(database).containsContact(txn, contactId); will(returnValue(true)); oneOf(database).getRequestedMessagesToSend(txn, contactId, - MAX_MESSAGE_LENGTH * 2, maxLatency); + BATCH_CAPACITY, maxLatency); will(returnValue(ids)); + // First message oneOf(database).getMessage(txn, messageId); will(returnValue(message)); oneOf(database).updateRetransmissionData(txn, contactId, messageId, maxLatency); + // Second message oneOf(database).getMessage(txn, messageId1); will(returnValue(message1)); oneOf(database).updateRetransmissionData(txn, contactId, @@ -1020,7 +1080,73 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { db.transaction(false, transaction -> assertEquals(messages, db.generateRequestedBatch(transaction, - contactId, MAX_MESSAGE_LENGTH * 2, maxLatency))); + contactId, BATCH_CAPACITY, maxLatency))); + } + + @Test + public void testGetMessageToSendMessageNotVisible() throws Exception { + context.checking(new Expectations() {{ + oneOf(database).startTransaction(); + will(returnValue(txn)); + oneOf(database).containsContact(txn, contactId); + will(returnValue(true)); + oneOf(database).containsVisibleMessage(txn, contactId, messageId); + will(returnValue(false)); + oneOf(database).commitTransaction(txn); + }}); + DatabaseComponent db = createDatabaseComponent(database, eventBus, + eventExecutor, shutdownManager); + + db.transaction(false, transaction -> + assertNull(db.getMessageToSend(transaction, contactId, + messageId, maxLatency, false))); + } + + @Test + public void testGetMessageToSendMessageNotMarkedAsSent() throws Exception { + context.checking(new Expectations() {{ + oneOf(database).startTransaction(); + will(returnValue(txn)); + oneOf(database).containsContact(txn, contactId); + will(returnValue(true)); + oneOf(database).containsVisibleMessage(txn, contactId, messageId); + will(returnValue(true)); + oneOf(database).getMessage(txn, messageId); + will(returnValue(message)); + oneOf(database).commitTransaction(txn); + }}); + DatabaseComponent db = createDatabaseComponent(database, eventBus, + eventExecutor, shutdownManager); + + db.transaction(false, transaction -> + assertEquals(message, db.getMessageToSend(transaction, + contactId, messageId, maxLatency, false))); + } + + @Test + public void testGetMessageToSendMessageMarkedAsSent() throws Exception { + context.checking(new Expectations() {{ + oneOf(database).startTransaction(); + will(returnValue(txn)); + oneOf(database).containsContact(txn, contactId); + will(returnValue(true)); + oneOf(database).containsVisibleMessage(txn, contactId, messageId); + will(returnValue(true)); + oneOf(database).getMessage(txn, messageId); + will(returnValue(message)); + oneOf(database).updateRetransmissionData(txn, contactId, messageId, + maxLatency); + oneOf(database).lowerRequestedFlag(txn, contactId, + singletonList(messageId)); + oneOf(database).commitTransaction(txn); + oneOf(eventBus).broadcast(with(any(MessagesSentEvent.class))); + }}); + DatabaseComponent db = createDatabaseComponent(database, eventBus, + eventExecutor, shutdownManager); + + db.transaction(false, transaction -> + assertEquals(message, db.getMessageToSend(transaction, + contactId, messageId, maxLatency, true))); } @Test @@ -1245,6 +1371,62 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { db.receiveRequest(transaction, contactId, r)); } + @Test + public void testSetAckSent() throws Exception { + Collection acked = asList(messageId, messageId1); + context.checking(new Expectations() {{ + oneOf(database).startTransaction(); + will(returnValue(txn)); + oneOf(database).containsContact(txn, contactId); + will(returnValue(true)); + // First message is still visible to the contact - flag lowered + oneOf(database).containsVisibleMessage(txn, contactId, messageId); + will(returnValue(true)); + // Second message is no longer visible - flag not lowered + oneOf(database).containsVisibleMessage(txn, contactId, messageId1); + will(returnValue(false)); + oneOf(database) + .lowerAckFlag(txn, contactId, singletonList(messageId)); + oneOf(database).commitTransaction(txn); + }}); + DatabaseComponent db = createDatabaseComponent(database, eventBus, + eventExecutor, shutdownManager); + + db.transaction(false, transaction -> + db.setAckSent(transaction, contactId, acked)); + } + + @Test + public void testSetMessagesSent() throws Exception { + long maxLatency = 123456; + Collection sent = asList(messageId, messageId1); + context.checking(new Expectations() {{ + oneOf(database).startTransaction(); + will(returnValue(txn)); + oneOf(database).containsContact(txn, contactId); + will(returnValue(true)); + // First message is still visible to the contact - mark as sent + oneOf(database).containsVisibleMessage(txn, contactId, messageId); + will(returnValue(true)); + oneOf(database).getMessageLength(txn, messageId); + will(returnValue(message.getRawLength())); + oneOf(database).updateRetransmissionData(txn, contactId, messageId, + maxLatency); + // Second message is no longer visible - don't mark as sent + oneOf(database).containsVisibleMessage(txn, contactId, messageId1); + will(returnValue(false)); + oneOf(database).lowerRequestedFlag(txn, contactId, + singletonList(messageId)); + oneOf(database).commitTransaction(txn); + oneOf(eventBus).broadcast(with(any(MessagesSentEvent.class))); + }}); + DatabaseComponent db = createDatabaseComponent(database, eventBus, + eventExecutor, shutdownManager); + + db.transaction(false, transaction -> + db.setMessagesSent(transaction, contactId, sent, maxLatency)); + } + @Test public void testChangingVisibilityFromInvisibleToVisibleCallsListeners() throws Exception { diff --git a/bramble-core/src/test/java/org/briarproject/bramble/db/DatabasePerformanceTest.java b/bramble-core/src/test/java/org/briarproject/bramble/db/DatabasePerformanceTest.java index 3e76c794f..09284170a 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/db/DatabasePerformanceTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/db/DatabasePerformanceTest.java @@ -33,7 +33,9 @@ import java.util.Random; import java.util.logging.Logger; import static java.util.logging.Level.OFF; +import static org.briarproject.bramble.api.record.Record.RECORD_HEADER_BYTES; import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS; +import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_LENGTH; import static org.briarproject.bramble.api.sync.validation.MessageState.DELIVERED; import static org.briarproject.bramble.test.TestUtils.deleteTestDirectory; import static org.briarproject.bramble.test.TestUtils.getAuthor; @@ -97,6 +99,9 @@ public abstract class DatabasePerformanceTest extends BrambleTestCase { // All our transports use a maximum latency of 30 seconds private static final int MAX_LATENCY = 30 * 1000; + private static final int BATCH_CAPACITY = + (RECORD_HEADER_BYTES + MAX_MESSAGE_LENGTH) * 2; + protected final File testDir = getTestDirectory(); private final File resultsFile = new File(getTestName() + ".tsv"); protected final Random random = new Random(); @@ -471,7 +476,7 @@ public abstract class DatabasePerformanceTest extends BrambleTestCase { benchmark(name, db -> { Connection txn = db.startTransaction(); db.getMessagesToSend(txn, pickRandom(contacts).getId(), - MAX_MESSAGE_IDS, MAX_LATENCY); + BATCH_CAPACITY, MAX_LATENCY); db.commitTransaction(txn); }); } @@ -522,7 +527,7 @@ public abstract class DatabasePerformanceTest extends BrambleTestCase { benchmark(name, db -> { Connection txn = db.startTransaction(); db.getRequestedMessagesToSend(txn, pickRandom(contacts).getId(), - MAX_MESSAGE_IDS, MAX_LATENCY); + BATCH_CAPACITY, MAX_LATENCY); db.commitTransaction(txn); }); } diff --git a/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java b/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java index 259b73ecf..c52dfba8f 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java @@ -57,7 +57,6 @@ import java.util.concurrent.atomic.AtomicLong; import static java.util.Arrays.asList; import static java.util.Collections.emptyList; -import static java.util.Collections.singleton; import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; import static java.util.concurrent.TimeUnit.SECONDS; @@ -65,6 +64,7 @@ import static org.briarproject.bramble.api.db.DatabaseComponent.NO_CLEANUP_DEADL import static org.briarproject.bramble.api.db.DatabaseComponent.TIMER_NOT_STARTED; import static org.briarproject.bramble.api.db.Metadata.REMOVE; import static org.briarproject.bramble.api.identity.AuthorConstants.MAX_AUTHOR_NAME_LENGTH; +import static org.briarproject.bramble.api.record.Record.RECORD_HEADER_BYTES; import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE; import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED; import static org.briarproject.bramble.api.sync.Group.Visibility.VISIBLE; @@ -350,14 +350,14 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { // The message is sendable, but too large to send assertOneMessageToSendLazily(db, txn); assertOneMessageToSendEagerly(db, txn); + int capacity = RECORD_HEADER_BYTES + message.getRawLength() - 1; Collection ids = - db.getMessagesToSend(txn, contactId, message.getRawLength() - 1, - MAX_LATENCY); + db.getMessagesToSend(txn, contactId, capacity, MAX_LATENCY); assertTrue(ids.isEmpty()); // The message is just the right size to send - ids = db.getMessagesToSend(txn, contactId, message.getRawLength(), - MAX_LATENCY); + capacity = RECORD_HEADER_BYTES + message.getRawLength(); + ids = db.getMessagesToSend(txn, contactId, capacity, MAX_LATENCY); assertEquals(singletonList(messageId), ids); db.commitTransaction(txn); @@ -396,16 +396,15 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { Collection ids = db.getMessagesToAck(txn, contactId, 1234); assertEquals(asList(messageId, messageId1), ids); - // Remove both message IDs + // Lower the ack flag db.lowerAckFlag(txn, contactId, asList(messageId, messageId1)); - // Both message IDs should have been removed + // No message IDs should be returned assertFalse( db.containsAnythingToSend(txn, contactId, MAX_LATENCY, false)); assertFalse( db.containsAnythingToSend(txn, contactId, MAX_LATENCY, true)); - assertEquals(emptyList(), db.getMessagesToAck(txn, - contactId, 1234)); + assertEquals(emptyList(), db.getMessagesToAck(txn, contactId, 1234)); // Raise the ack flag again db.raiseAckFlag(txn, contactId, messageId); @@ -2603,7 +2602,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { Connection txn) throws Exception { assertFalse( db.containsAnythingToSend(txn, contactId, MAX_LATENCY, true)); - Map unacked = + Collection unacked = db.getUnackedMessagesToSend(txn, contactId); assertTrue(unacked.isEmpty()); assertEquals(0, db.getUnackedMessageBytesToSend(txn, contactId)); @@ -2613,10 +2612,9 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { Connection txn) throws Exception { assertTrue( db.containsAnythingToSend(txn, contactId, MAX_LATENCY, true)); - Map unacked = + Collection unacked = db.getUnackedMessagesToSend(txn, contactId); - assertEquals(singleton(messageId), unacked.keySet()); - assertEquals(message.getRawLength(), unacked.get(messageId).intValue()); + assertEquals(singletonList(messageId), unacked); assertEquals(message.getRawLength(), db.getUnackedMessageBytesToSend(txn, contactId)); } diff --git a/bramble-core/src/test/java/org/briarproject/bramble/sync/EagerSimplexOutgoingSessionTest.java b/bramble-core/src/test/java/org/briarproject/bramble/sync/EagerSimplexOutgoingSessionTest.java new file mode 100644 index 000000000..2e8f52786 --- /dev/null +++ b/bramble-core/src/test/java/org/briarproject/bramble/sync/EagerSimplexOutgoingSessionTest.java @@ -0,0 +1,134 @@ +package org.briarproject.bramble.sync; + +import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.db.DatabaseComponent; +import org.briarproject.bramble.api.db.Transaction; +import org.briarproject.bramble.api.event.EventBus; +import org.briarproject.bramble.api.plugin.TransportId; +import org.briarproject.bramble.api.sync.Ack; +import org.briarproject.bramble.api.sync.GroupId; +import org.briarproject.bramble.api.sync.Message; +import org.briarproject.bramble.api.sync.MessageId; +import org.briarproject.bramble.api.sync.SyncRecordWriter; +import org.briarproject.bramble.api.sync.Versions; +import org.briarproject.bramble.api.transport.StreamWriter; +import org.briarproject.bramble.test.BrambleMockTestCase; +import org.briarproject.bramble.test.DbExpectations; +import org.junit.Test; + +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_BODY_LENGTH; +import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS; +import static org.briarproject.bramble.test.TestUtils.getContactId; +import static org.briarproject.bramble.test.TestUtils.getMessage; +import static org.briarproject.bramble.test.TestUtils.getRandomId; +import static org.briarproject.bramble.test.TestUtils.getTransportId; + +public class EagerSimplexOutgoingSessionTest extends BrambleMockTestCase { + + private static final int MAX_LATENCY = Integer.MAX_VALUE; + + private final DatabaseComponent db = context.mock(DatabaseComponent.class); + private final EventBus eventBus = context.mock(EventBus.class); + private final StreamWriter streamWriter = context.mock(StreamWriter.class); + private final SyncRecordWriter recordWriter = + context.mock(SyncRecordWriter.class); + + private final ContactId contactId = getContactId(); + private final TransportId transportId = getTransportId(); + private final Ack ack = + new Ack(singletonList(new MessageId(getRandomId()))); + private final Message message = getMessage(new GroupId(getRandomId()), + MAX_MESSAGE_BODY_LENGTH); + private final Message message1 = getMessage(new GroupId(getRandomId()), + MAX_MESSAGE_BODY_LENGTH); + + @Test + public void testNothingToSendEagerly() throws Exception { + EagerSimplexOutgoingSession session = + new EagerSimplexOutgoingSession(db, eventBus, contactId, + transportId, MAX_LATENCY, streamWriter, recordWriter); + + Transaction noAckTxn = new Transaction(null, false); + Transaction noIdsTxn = new Transaction(null, true); + + context.checking(new DbExpectations() {{ + // Add listener + oneOf(eventBus).addListener(session); + // Send the protocol versions + oneOf(recordWriter).writeVersions(with(any(Versions.class))); + // No acks to send + oneOf(db).transactionWithNullableResult(with(false), + withNullableDbCallable(noAckTxn)); + oneOf(db).generateAck(noAckTxn, contactId, MAX_MESSAGE_IDS); + will(returnValue(null)); + // No messages to send + oneOf(db).transactionWithResult(with(true), + withDbCallable(noIdsTxn)); + oneOf(db).getUnackedMessagesToSend(noIdsTxn, contactId); + will(returnValue(emptyList())); + // Send the end of stream marker + oneOf(streamWriter).sendEndOfStream(); + // Remove listener + oneOf(eventBus).removeListener(session); + }}); + + session.run(); + } + + @Test + public void testSomethingToSendEagerly() throws Exception { + EagerSimplexOutgoingSession session = + new EagerSimplexOutgoingSession(db, eventBus, contactId, + transportId, MAX_LATENCY, streamWriter, recordWriter); + + Transaction ackTxn = new Transaction(null, false); + Transaction noAckTxn = new Transaction(null, false); + Transaction idsTxn = new Transaction(null, true); + Transaction msgTxn = new Transaction(null, false); + Transaction msgTxn1 = new Transaction(null, false); + + context.checking(new DbExpectations() {{ + // Add listener + oneOf(eventBus).addListener(session); + // Send the protocol versions + oneOf(recordWriter).writeVersions(with(any(Versions.class))); + // One ack to send + oneOf(db).transactionWithNullableResult(with(false), + withNullableDbCallable(ackTxn)); + oneOf(db).generateAck(ackTxn, contactId, MAX_MESSAGE_IDS); + will(returnValue(ack)); + oneOf(recordWriter).writeAck(ack); + // No more acks + oneOf(db).transactionWithNullableResult(with(false), + withNullableDbCallable(noAckTxn)); + oneOf(db).generateAck(noAckTxn, contactId, MAX_MESSAGE_IDS); + will(returnValue(null)); + // Two messages to send + oneOf(db).transactionWithResult(with(true), withDbCallable(idsTxn)); + oneOf(db).getUnackedMessagesToSend(idsTxn, contactId); + will(returnValue(asList(message.getId(), message1.getId()))); + // Try to send the first message - it's no longer shared + oneOf(db).transactionWithNullableResult(with(false), + withNullableDbCallable(msgTxn)); + oneOf(db).getMessageToSend(msgTxn, contactId, message.getId(), + MAX_LATENCY, true); + will(returnValue(null)); + // Send the second message + oneOf(db).transactionWithNullableResult(with(false), + withNullableDbCallable(msgTxn1)); + oneOf(db).getMessageToSend(msgTxn1, contactId, message1.getId(), + MAX_LATENCY, true); + will(returnValue(message1)); + oneOf(recordWriter).writeMessage(message1); + // Send the end of stream marker + oneOf(streamWriter).sendEndOfStream(); + // Remove listener + oneOf(eventBus).removeListener(session); + }}); + + session.run(); + } +} diff --git a/bramble-core/src/test/java/org/briarproject/bramble/sync/MailboxOutgoingSessionTest.java b/bramble-core/src/test/java/org/briarproject/bramble/sync/MailboxOutgoingSessionTest.java new file mode 100644 index 000000000..0130e25a6 --- /dev/null +++ b/bramble-core/src/test/java/org/briarproject/bramble/sync/MailboxOutgoingSessionTest.java @@ -0,0 +1,231 @@ +package org.briarproject.bramble.sync; + +import org.briarproject.bramble.api.contact.ContactId; +import org.briarproject.bramble.api.db.DatabaseComponent; +import org.briarproject.bramble.api.db.Transaction; +import org.briarproject.bramble.api.event.EventBus; +import org.briarproject.bramble.api.plugin.TransportId; +import org.briarproject.bramble.api.sync.Ack; +import org.briarproject.bramble.api.sync.DeferredSendHandler; +import org.briarproject.bramble.api.sync.GroupId; +import org.briarproject.bramble.api.sync.Message; +import org.briarproject.bramble.api.sync.MessageId; +import org.briarproject.bramble.api.sync.SyncRecordWriter; +import org.briarproject.bramble.api.sync.Versions; +import org.briarproject.bramble.api.transport.StreamWriter; +import org.briarproject.bramble.test.BrambleMockTestCase; +import org.briarproject.bramble.test.DbExpectations; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.briarproject.bramble.api.mailbox.MailboxConstants.MAX_FILE_PAYLOAD_BYTES; +import static org.briarproject.bramble.api.record.Record.RECORD_HEADER_BYTES; +import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_BODY_LENGTH; +import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS; +import static org.briarproject.bramble.test.TestUtils.getContactId; +import static org.briarproject.bramble.test.TestUtils.getMessage; +import static org.briarproject.bramble.test.TestUtils.getRandomId; +import static org.briarproject.bramble.test.TestUtils.getTransportId; + +public class MailboxOutgoingSessionTest extends BrambleMockTestCase { + + private static final int MAX_LATENCY = Integer.MAX_VALUE; + + private final DatabaseComponent db = context.mock(DatabaseComponent.class); + private final EventBus eventBus = context.mock(EventBus.class); + private final StreamWriter streamWriter = context.mock(StreamWriter.class); + private final SyncRecordWriter recordWriter = + context.mock(SyncRecordWriter.class); + private final DeferredSendHandler deferredSendHandler = + context.mock(DeferredSendHandler.class); + + private final ContactId contactId = getContactId(); + private final TransportId transportId = getTransportId(); + private final Message message = getMessage(new GroupId(getRandomId()), + MAX_MESSAGE_BODY_LENGTH); + private final Message message1 = getMessage(new GroupId(getRandomId()), + MAX_MESSAGE_BODY_LENGTH); + private final int versionRecordBytes = RECORD_HEADER_BYTES + 1; + + @Test + public void testNothingToSend() throws Exception { + MailboxOutgoingSession session = new MailboxOutgoingSession(db, + eventBus, contactId, transportId, MAX_LATENCY, + streamWriter, recordWriter, deferredSendHandler, + MAX_FILE_PAYLOAD_BYTES); + + Transaction noAckIdTxn = new Transaction(null, true); + Transaction noMsgIdTxn = new Transaction(null, true); + + int capacityForMessages = MAX_FILE_PAYLOAD_BYTES - versionRecordBytes; + + context.checking(new DbExpectations() {{ + // Add listener + oneOf(eventBus).addListener(session); + // Send the protocol versions + oneOf(recordWriter).writeVersions(with(any(Versions.class))); + // Calculate capacity for acks + oneOf(recordWriter).getBytesWritten(); + will(returnValue((long) versionRecordBytes)); + // No messages to ack + oneOf(db).transactionWithResult(with(true), + withDbCallable(noAckIdTxn)); + oneOf(db).getMessagesToAck(noAckIdTxn, contactId, MAX_MESSAGE_IDS); + will(returnValue(emptyList())); + // Calculate capacity for messages + oneOf(recordWriter).getBytesWritten(); + will(returnValue((long) versionRecordBytes)); + // No messages to send + oneOf(db).transactionWithResult(with(true), + withDbCallable(noMsgIdTxn)); + oneOf(db).getMessagesToSend(noMsgIdTxn, contactId, + capacityForMessages, MAX_LATENCY); + will(returnValue(emptyList())); + // Send the end of stream marker + oneOf(streamWriter).sendEndOfStream(); + // Remove listener + oneOf(eventBus).removeListener(session); + }}); + + session.run(); + } + + @Test + public void testSomethingToSend() throws Exception { + MailboxOutgoingSession session = new MailboxOutgoingSession(db, + eventBus, contactId, transportId, MAX_LATENCY, + streamWriter, recordWriter, deferredSendHandler, + MAX_FILE_PAYLOAD_BYTES); + + Transaction ackIdTxn = new Transaction(null, true); + Transaction noAckIdTxn = new Transaction(null, true); + Transaction msgIdTxn = new Transaction(null, true); + Transaction msgTxn = new Transaction(null, true); + + int ackRecordBytes = RECORD_HEADER_BYTES + MessageId.LENGTH; + int capacityForMessages = + MAX_FILE_PAYLOAD_BYTES - versionRecordBytes - ackRecordBytes; + + context.checking(new DbExpectations() {{ + // Add listener + oneOf(eventBus).addListener(session); + // Send the protocol versions + oneOf(recordWriter).writeVersions(with(any(Versions.class))); + // Calculate capacity for acks + oneOf(recordWriter).getBytesWritten(); + will(returnValue((long) versionRecordBytes)); + // One message to ack + oneOf(db).transactionWithResult(with(true), + withDbCallable(ackIdTxn)); + oneOf(db).getMessagesToAck(ackIdTxn, contactId, MAX_MESSAGE_IDS); + will(returnValue(singletonList(message.getId()))); + // Send the ack + oneOf(recordWriter).getBytesWritten(); + will(returnValue((long) versionRecordBytes)); + oneOf(recordWriter).writeAck(with(any(Ack.class))); + oneOf(deferredSendHandler) + .onAckSent(singletonList(message.getId())); + // No more messages to ack + oneOf(db).transactionWithResult(with(true), + withDbCallable(noAckIdTxn)); + oneOf(db).getMessagesToAck(noAckIdTxn, contactId, MAX_MESSAGE_IDS); + will(returnValue(emptyList())); + // Calculate capacity for messages + oneOf(recordWriter).getBytesWritten(); + will(returnValue((long) versionRecordBytes + ackRecordBytes)); + // One message to send + oneOf(db).transactionWithResult(with(true), + withDbCallable(msgIdTxn)); + oneOf(db).getMessagesToSend(msgIdTxn, contactId, + capacityForMessages, MAX_LATENCY); + will(returnValue(singletonList(message1.getId()))); + // Send the message + oneOf(db).transactionWithNullableResult(with(true), + withNullableDbCallable(msgTxn)); + oneOf(db).getMessageToSend(msgTxn, contactId, message1.getId(), + MAX_LATENCY, false); + will(returnValue(message1)); + oneOf(recordWriter).writeMessage(message1); + oneOf(deferredSendHandler).onMessageSent(message1.getId()); + // Send the end of stream marker + oneOf(streamWriter).sendEndOfStream(); + // Remove listener + oneOf(eventBus).removeListener(session); + }}); + + session.run(); + } + + @Test + public void testAllCapacityUsedByAcks() throws Exception { + // The file has enough capacity for a max-size ack record, another + // ack record with one message ID, and a few bytes left over + int capacity = RECORD_HEADER_BYTES + MessageId.LENGTH * MAX_MESSAGE_IDS + + RECORD_HEADER_BYTES + MessageId.LENGTH + MessageId.LENGTH - 1; + + MailboxOutgoingSession session = new MailboxOutgoingSession(db, + eventBus, contactId, transportId, MAX_LATENCY, + streamWriter, recordWriter, deferredSendHandler, capacity); + + Transaction ackIdTxn1 = new Transaction(null, true); + Transaction ackIdTxn2 = new Transaction(null, true); + + int firstAckRecordBytes = + RECORD_HEADER_BYTES + MessageId.LENGTH * MAX_MESSAGE_IDS; + int secondAckRecordBytes = RECORD_HEADER_BYTES + MessageId.LENGTH; + + List idsInFirstAck = new ArrayList<>(MAX_MESSAGE_IDS); + for (int i = 0; i < MAX_MESSAGE_IDS; i++) { + idsInFirstAck.add(new MessageId(getRandomId())); + } + List idsInSecondAck = + singletonList(new MessageId(getRandomId())); + + context.checking(new DbExpectations() {{ + // Add listener + oneOf(eventBus).addListener(session); + // Send the protocol versions + oneOf(recordWriter).writeVersions(with(any(Versions.class))); + // Calculate capacity for acks + oneOf(recordWriter).getBytesWritten(); + will(returnValue((long) versionRecordBytes)); + // Load the IDs for the first ack record + oneOf(db).transactionWithResult(with(true), + withDbCallable(ackIdTxn1)); + oneOf(db).getMessagesToAck(ackIdTxn1, contactId, MAX_MESSAGE_IDS); + will(returnValue(idsInFirstAck)); + // Send the first ack record + oneOf(recordWriter).writeAck(with(any(Ack.class))); + oneOf(deferredSendHandler).onAckSent(idsInFirstAck); + // Calculate remaining capacity for acks + oneOf(recordWriter).getBytesWritten(); + will(returnValue((long) versionRecordBytes + firstAckRecordBytes)); + // Load the IDs for the second ack record + oneOf(db).transactionWithResult(with(true), + withDbCallable(ackIdTxn2)); + oneOf(db).getMessagesToAck(ackIdTxn2, contactId, 1); + will(returnValue(idsInSecondAck)); + // Send the second ack record + oneOf(recordWriter).writeAck(with(any(Ack.class))); + oneOf(deferredSendHandler).onAckSent(idsInSecondAck); + // Not enough capacity left for another ack + oneOf(recordWriter).getBytesWritten(); + will(returnValue((long) versionRecordBytes + firstAckRecordBytes + + secondAckRecordBytes)); + // Not enough capacity left for any messages + oneOf(recordWriter).getBytesWritten(); + will(returnValue((long) versionRecordBytes + firstAckRecordBytes + + secondAckRecordBytes)); + // Send the end of stream marker + oneOf(streamWriter).sendEndOfStream(); + // Remove listener + oneOf(eventBus).removeListener(session); + }}); + + session.run(); + } +} diff --git a/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java b/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java index bd4bc35f0..81dc5461a 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java @@ -16,14 +16,10 @@ import org.briarproject.bramble.test.BrambleMockTestCase; import org.briarproject.bramble.test.DbExpectations; import org.junit.Test; -import java.util.LinkedHashMap; -import java.util.Map; - -import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; -import static org.briarproject.bramble.api.record.Record.MAX_RECORD_PAYLOAD_BYTES; import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_BODY_LENGTH; import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS; +import static org.briarproject.bramble.sync.SimplexOutgoingSession.BATCH_CAPACITY; import static org.briarproject.bramble.test.TestUtils.getContactId; import static org.briarproject.bramble.test.TestUtils.getMessage; import static org.briarproject.bramble.test.TestUtils.getRandomId; @@ -45,14 +41,12 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { new Ack(singletonList(new MessageId(getRandomId()))); private final Message message = getMessage(new GroupId(getRandomId()), MAX_MESSAGE_BODY_LENGTH); - private final Message message1 = getMessage(new GroupId(getRandomId()), - MAX_MESSAGE_BODY_LENGTH); @Test public void testNothingToSend() throws Exception { SimplexOutgoingSession session = new SimplexOutgoingSession(db, eventBus, contactId, transportId, MAX_LATENCY, - false, streamWriter, recordWriter); + streamWriter, recordWriter); Transaction noAckTxn = new Transaction(null, false); Transaction noMsgTxn = new Transaction(null, false); @@ -71,7 +65,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { oneOf(db).transactionWithNullableResult(with(false), withNullableDbCallable(noMsgTxn)); oneOf(db).generateBatch(noMsgTxn, contactId, - MAX_RECORD_PAYLOAD_BYTES, MAX_LATENCY); + BATCH_CAPACITY, MAX_LATENCY); will(returnValue(null)); // Send the end of stream marker oneOf(streamWriter).sendEndOfStream(); @@ -82,44 +76,11 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { session.run(); } - @Test - public void testNothingToSendEagerly() throws Exception { - SimplexOutgoingSession session = new SimplexOutgoingSession(db, - eventBus, contactId, transportId, MAX_LATENCY, - true, streamWriter, recordWriter); - - Transaction noAckTxn = new Transaction(null, false); - Transaction noIdsTxn = new Transaction(null, true); - - context.checking(new DbExpectations() {{ - // Add listener - oneOf(eventBus).addListener(session); - // Send the protocol versions - oneOf(recordWriter).writeVersions(with(any(Versions.class))); - // No acks to send - oneOf(db).transactionWithNullableResult(with(false), - withNullableDbCallable(noAckTxn)); - oneOf(db).generateAck(noAckTxn, contactId, MAX_MESSAGE_IDS); - will(returnValue(null)); - // No messages to send - oneOf(db).transactionWithResult(with(true), - withDbCallable(noIdsTxn)); - oneOf(db).getUnackedMessagesToSend(noIdsTxn, contactId); - will(returnValue(emptyMap())); - // Send the end of stream marker - oneOf(streamWriter).sendEndOfStream(); - // Remove listener - oneOf(eventBus).removeListener(session); - }}); - - session.run(); - } - @Test public void testSomethingToSend() throws Exception { SimplexOutgoingSession session = new SimplexOutgoingSession(db, eventBus, contactId, transportId, MAX_LATENCY, - false, streamWriter, recordWriter); + streamWriter, recordWriter); Transaction ackTxn = new Transaction(null, false); Transaction noAckTxn = new Transaction(null, false); @@ -146,14 +107,14 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { oneOf(db).transactionWithNullableResult(with(false), withNullableDbCallable(msgTxn)); oneOf(db).generateBatch(msgTxn, contactId, - MAX_RECORD_PAYLOAD_BYTES, MAX_LATENCY); + BATCH_CAPACITY, MAX_LATENCY); will(returnValue(singletonList(message))); oneOf(recordWriter).writeMessage(message); // No more messages oneOf(db).transactionWithNullableResult(with(false), withNullableDbCallable(noMsgTxn)); oneOf(db).generateBatch(noMsgTxn, contactId, - MAX_RECORD_PAYLOAD_BYTES, MAX_LATENCY); + BATCH_CAPACITY, MAX_LATENCY); will(returnValue(null)); // Send the end of stream marker oneOf(streamWriter).sendEndOfStream(); @@ -163,63 +124,4 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase { session.run(); } - - @Test - public void testSomethingToSendEagerly() throws Exception { - SimplexOutgoingSession session = new SimplexOutgoingSession(db, - eventBus, contactId, transportId, MAX_LATENCY, - true, streamWriter, recordWriter); - - Map unacked = new LinkedHashMap<>(); - unacked.put(message.getId(), message.getRawLength()); - unacked.put(message1.getId(), message1.getRawLength()); - - Transaction ackTxn = new Transaction(null, false); - Transaction noAckTxn = new Transaction(null, false); - Transaction idsTxn = new Transaction(null, true); - Transaction msgTxn = new Transaction(null, false); - Transaction msgTxn1 = new Transaction(null, false); - - context.checking(new DbExpectations() {{ - // Add listener - oneOf(eventBus).addListener(session); - // Send the protocol versions - oneOf(recordWriter).writeVersions(with(any(Versions.class))); - // One ack to send - oneOf(db).transactionWithNullableResult(with(false), - withNullableDbCallable(ackTxn)); - oneOf(db).generateAck(ackTxn, contactId, MAX_MESSAGE_IDS); - will(returnValue(ack)); - oneOf(recordWriter).writeAck(ack); - // No more acks - oneOf(db).transactionWithNullableResult(with(false), - withNullableDbCallable(noAckTxn)); - oneOf(db).generateAck(noAckTxn, contactId, MAX_MESSAGE_IDS); - will(returnValue(null)); - // Two messages to send - oneOf(db).transactionWithResult(with(true), withDbCallable(idsTxn)); - oneOf(db).getUnackedMessagesToSend(idsTxn, contactId); - will(returnValue(unacked)); - // Send the first message - oneOf(db).transactionWithResult(with(false), - withDbCallable(msgTxn)); - oneOf(db).generateBatch(msgTxn, contactId, - singletonList(message.getId()), MAX_LATENCY); - will(returnValue(singletonList(message))); - oneOf(recordWriter).writeMessage(message); - // Send the second message - oneOf(db).transactionWithResult(with(false), - withDbCallable(msgTxn1)); - oneOf(db).generateBatch(msgTxn1, contactId, - singletonList(message1.getId()), MAX_LATENCY); - will(returnValue(singletonList(message1))); - oneOf(recordWriter).writeMessage(message1); - // Send the end of stream marker - oneOf(streamWriter).sendEndOfStream(); - // Remove listener - oneOf(eventBus).removeListener(session); - }}); - - session.run(); - } } From e614046662e495550601f6eb770792bfd12c7c8a Mon Sep 17 00:00:00 2001 From: akwizgran Date: Wed, 27 Apr 2022 16:32:34 +0100 Subject: [PATCH 4/4] Use longs to represent session capacity. --- .../bramble/api/db/DatabaseComponent.java | 8 ++++---- .../java/org/briarproject/bramble/db/Database.java | 6 +++--- .../bramble/db/DatabaseComponentImpl.java | 6 +++--- .../org/briarproject/bramble/db/JdbcDatabase.java | 4 ++-- .../bramble/sync/MailboxOutgoingSession.java | 14 +++++++------- .../briarproject/bramble/db/JdbcDatabaseTest.java | 2 +- .../bramble/sync/MailboxOutgoingSessionTest.java | 6 +++--- 7 files changed, 23 insertions(+), 23 deletions(-) diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java index 34a2928f3..3869e71a6 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java @@ -208,7 +208,7 @@ public interface DatabaseComponent extends TransactionManager { */ @Nullable Collection generateBatch(Transaction txn, ContactId c, - int capacity, long maxLatency) throws DbException; + long capacity, long maxLatency) throws DbException; /** * Returns an offer for the given contact for transmission over a @@ -237,7 +237,7 @@ public interface DatabaseComponent extends TransactionManager { */ @Nullable Collection generateRequestedBatch(Transaction txn, ContactId c, - int capacity, long maxLatency) throws DbException; + long capacity, long maxLatency) throws DbException; /** * Returns the contact with the given ID. @@ -363,7 +363,7 @@ public interface DatabaseComponent extends TransactionManager { * Read-only. */ Collection getMessagesToSend(Transaction txn, ContactId c, - int capacity, long maxLatency) throws DbException; + long capacity, long maxLatency) throws DbException; /** * Returns the IDs of any messages that need to be validated. @@ -498,7 +498,7 @@ public interface DatabaseComponent extends TransactionManager { * Returns the IDs of all messages that are eligible to be sent to the * given contact. *

- * Unlike {@link #getMessagesToSend(Transaction, ContactId, int, long)} + * Unlike {@link #getMessagesToSend(Transaction, ContactId, long, long)} * this method may return messages that have already been sent and are * not yet due for retransmission. *

diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java b/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java index 1d0c6bedc..e0aaaecc3 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java @@ -511,14 +511,14 @@ interface Database { *

* Read-only. */ - Collection getMessagesToSend(T txn, ContactId c, int capacity, + Collection getMessagesToSend(T txn, ContactId c, long capacity, long maxLatency) throws DbException; /** * Returns the IDs of all messages that are eligible to be sent to the * given contact. *

- * Unlike {@link #getMessagesToSend(Object, ContactId, int, long)} this + * Unlike {@link #getMessagesToSend(Object, ContactId, long, long)} this * method may return messages that have already been sent and are not yet * due for retransmission. *

@@ -612,7 +612,7 @@ interface Database { * Read-only. */ Collection getRequestedMessagesToSend(T txn, ContactId c, - int capacity, long maxLatency) throws DbException; + long capacity, long maxLatency) throws DbException; /** * Returns all settings in the given namespace. diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java index 895e4a0ce..64b6fc3c3 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java @@ -424,7 +424,7 @@ class DatabaseComponentImpl implements DatabaseComponent { @Nullable @Override public Collection generateBatch(Transaction transaction, - ContactId c, int capacity, long maxLatency) throws DbException { + ContactId c, long capacity, long maxLatency) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); T txn = unbox(transaction); if (!db.containsContact(txn, c)) @@ -479,7 +479,7 @@ class DatabaseComponentImpl implements DatabaseComponent { @Nullable @Override public Collection generateRequestedBatch(Transaction transaction, - ContactId c, int capacity, long maxLatency) throws DbException { + ContactId c, long capacity, long maxLatency) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); T txn = unbox(transaction); if (!db.containsContact(txn, c)) @@ -620,7 +620,7 @@ class DatabaseComponentImpl implements DatabaseComponent { @Override public Collection getMessagesToSend(Transaction transaction, - ContactId c, int capacity, long maxLatency) throws DbException { + ContactId c, long capacity, long maxLatency) throws DbException { T txn = unbox(transaction); if (!db.containsContact(txn, c)) throw new NoSuchContactException(); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java b/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java index 52d92eb9f..fc988c441 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java @@ -2253,7 +2253,7 @@ abstract class JdbcDatabase implements Database { @Override public Collection getMessagesToSend(Connection txn, - ContactId c, int capacity, long maxLatency) throws DbException { + ContactId c, long capacity, long maxLatency) throws DbException { long now = clock.currentTimeMillis(); PreparedStatement ps = null; ResultSet rs = null; @@ -2548,7 +2548,7 @@ abstract class JdbcDatabase implements Database { @Override public Collection getRequestedMessagesToSend(Connection txn, - ContactId c, int capacity, long maxLatency) throws DbException { + ContactId c, long capacity, long maxLatency) throws DbException { long now = clock.currentTimeMillis(); PreparedStatement ps = null; ResultSet rs = null; diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/MailboxOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/MailboxOutgoingSession.java index a8250475f..5d4e06fb4 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/MailboxOutgoingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/MailboxOutgoingSession.java @@ -42,7 +42,7 @@ class MailboxOutgoingSession extends SimplexOutgoingSession { getLogger(MailboxOutgoingSession.class.getName()); private final DeferredSendHandler deferredSendHandler; - private final int initialCapacity; + private final long initialCapacity; MailboxOutgoingSession(DatabaseComponent db, EventBus eventBus, @@ -52,7 +52,7 @@ class MailboxOutgoingSession extends SimplexOutgoingSession { StreamWriter streamWriter, SyncRecordWriter recordWriter, DeferredSendHandler deferredSendHandler, - int capacity) { + long capacity) { super(db, eventBus, contactId, transportId, maxLatency, streamWriter, recordWriter); this.deferredSendHandler = deferredSendHandler; @@ -71,10 +71,10 @@ class MailboxOutgoingSession extends SimplexOutgoingSession { } private Collection loadMessageIdsToAck() throws DbException { - int idCapacity = (getRemainingCapacity() - RECORD_HEADER_BYTES) + long idCapacity = (getRemainingCapacity() - RECORD_HEADER_BYTES) / MessageId.LENGTH; if (idCapacity <= 0) return emptyList(); // Out of capacity - int maxMessageIds = min(idCapacity, MAX_MESSAGE_IDS); + int maxMessageIds = (int) min(idCapacity, MAX_MESSAGE_IDS); Collection ids = db.transactionWithResult(true, txn -> db.getMessagesToAck(txn, contactId, maxMessageIds)); if (LOG.isLoggable(INFO)) { @@ -83,8 +83,8 @@ class MailboxOutgoingSession extends SimplexOutgoingSession { return ids; } - private int getRemainingCapacity() { - return initialCapacity - (int) recordWriter.getBytesWritten(); + private long getRemainingCapacity() { + return initialCapacity - recordWriter.getBytesWritten(); } @Override @@ -102,7 +102,7 @@ class MailboxOutgoingSession extends SimplexOutgoingSession { } private Collection loadMessageIdsToSend() throws DbException { - int capacity = getRemainingCapacity(); + long capacity = getRemainingCapacity(); if (capacity < RECORD_HEADER_BYTES + MESSAGE_HEADER_LENGTH) { return emptyList(); // Out of capacity } diff --git a/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java b/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java index c52dfba8f..567a89c60 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java @@ -350,7 +350,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { // The message is sendable, but too large to send assertOneMessageToSendLazily(db, txn); assertOneMessageToSendEagerly(db, txn); - int capacity = RECORD_HEADER_BYTES + message.getRawLength() - 1; + long capacity = RECORD_HEADER_BYTES + message.getRawLength() - 1; Collection ids = db.getMessagesToSend(txn, contactId, capacity, MAX_LATENCY); assertTrue(ids.isEmpty()); diff --git a/bramble-core/src/test/java/org/briarproject/bramble/sync/MailboxOutgoingSessionTest.java b/bramble-core/src/test/java/org/briarproject/bramble/sync/MailboxOutgoingSessionTest.java index 0130e25a6..904882847 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/sync/MailboxOutgoingSessionTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/sync/MailboxOutgoingSessionTest.java @@ -61,7 +61,7 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { Transaction noAckIdTxn = new Transaction(null, true); Transaction noMsgIdTxn = new Transaction(null, true); - int capacityForMessages = MAX_FILE_PAYLOAD_BYTES - versionRecordBytes; + long capacityForMessages = MAX_FILE_PAYLOAD_BYTES - versionRecordBytes; context.checking(new DbExpectations() {{ // Add listener @@ -107,7 +107,7 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { Transaction msgTxn = new Transaction(null, true); int ackRecordBytes = RECORD_HEADER_BYTES + MessageId.LENGTH; - int capacityForMessages = + long capacityForMessages = MAX_FILE_PAYLOAD_BYTES - versionRecordBytes - ackRecordBytes; context.checking(new DbExpectations() {{ @@ -164,7 +164,7 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { public void testAllCapacityUsedByAcks() throws Exception { // The file has enough capacity for a max-size ack record, another // ack record with one message ID, and a few bytes left over - int capacity = RECORD_HEADER_BYTES + MessageId.LENGTH * MAX_MESSAGE_IDS + long capacity = RECORD_HEADER_BYTES + MessageId.LENGTH * MAX_MESSAGE_IDS + RECORD_HEADER_BYTES + MessageId.LENGTH + MessageId.LENGTH - 1; MailboxOutgoingSession session = new MailboxOutgoingSession(db,