diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java
index ea507351b..3869e71a6 100644
--- a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java
+++ b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java
@@ -33,11 +33,18 @@ import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
/**
* Encapsulates the database implementation and exposes high-level operations
* to other components.
+ *
+ * With the exception of the {@link #open(SecretKey, MigrationListener)} and
+ * {@link #close()} methods, which must not be called concurrently, the
+ * database can be accessed from any thread. See {@link TransactionManager}
+ * for locking behaviour.
*/
+@ThreadSafe
@NotNullByDefault
public interface DatabaseComponent extends TransactionManager {
@@ -193,26 +200,15 @@ public interface DatabaseComponent extends TransactionManager {
throws DbException;
/**
- * Returns a batch of messages for the given contact, with a total length
- * less than or equal to the given length, for transmission over a
- * transport with the given maximum latency. Returns null if there are no
- * sendable messages that fit in the given length.
+ * Returns a batch of messages for the given contact, for transmission over
+ * a transport with the given maximum latency. The total length of the
+ * messages, including record headers, will be no more than the given
+ * capacity. Returns null if there are no sendable messages that would fit
+ * in the given capacity.
*/
@Nullable
Collection generateBatch(Transaction txn, ContactId c,
- int maxLength, long maxLatency) throws DbException;
-
- /**
- * Returns a batch of messages for the given contact containing the
- * messages with the given IDs, for transmission over a transport with
- * the given maximum latency.
- *
- * If any of the given messages are not in the database or are not visible
- * to the contact, they are omitted from the batch without throwing an
- * exception.
- */
- Collection generateBatch(Transaction txn, ContactId c,
- Collection ids, long maxLatency) throws DbException;
+ long capacity, long maxLatency) throws DbException;
/**
* Returns an offer for the given contact for transmission over a
@@ -232,15 +228,16 @@ public interface DatabaseComponent extends TransactionManager {
throws DbException;
/**
- * Returns a batch of messages for the given contact, with a total length
- * less than or equal to the given length, for transmission over a
- * transport with the given maximum latency. Only messages that have been
- * requested by the contact are returned. Returns null if there are no
- * sendable messages that fit in the given length.
+ * Returns a batch of messages for the given contact, for transmission over
+ * a transport with the given maximum latency. Only messages that have been
+ * requested by the contact are returned. The total length of the messages,
+ * including record headers, will be no more than the given capacity.
+ * Returns null if there are no sendable messages that have been requested
+ * by the contact and would fit in the given capacity.
*/
@Nullable
Collection generateRequestedBatch(Transaction txn, ContactId c,
- int maxLength, long maxLatency) throws DbException;
+ long capacity, long maxLatency) throws DbException;
/**
* Returns the contact with the given ID.
@@ -344,6 +341,30 @@ public interface DatabaseComponent extends TransactionManager {
Collection getMessageIds(Transaction txn, GroupId g,
Metadata query) throws DbException;
+ /**
+ * Returns the IDs of some messages received from the given contact that
+ * need to be acknowledged, up to the given number of messages.
+ *
+ * Read-only.
+ */
+ Collection getMessagesToAck(Transaction txn, ContactId c,
+ int maxMessages) throws DbException;
+
+ /**
+ * Returns the IDs of some messages that are eligible to be sent to the
+ * given contact over a transport with the given maximum latency. The total
+ * length of the messages including record headers will be no more than the
+ * given capacity.
+ *
+ * Unlike {@link #getUnackedMessagesToSend(Transaction, ContactId)} this
+ * method does not return messages that have already been sent unless they
+ * are due for retransmission.
+ *
+ * Read-only.
+ */
+ Collection getMessagesToSend(Transaction txn, ContactId c,
+ long capacity, long maxLatency) throws DbException;
+
/**
* Returns the IDs of any messages that need to be validated.
*
@@ -460,15 +481,30 @@ public interface DatabaseComponent extends TransactionManager {
MessageStatus getMessageStatus(Transaction txn, ContactId c, MessageId m)
throws DbException;
+ /**
+ * Returns the message with the given ID for transmission to the given
+ * contact over a transport with the given maximum latency. Returns null
+ * if the message is no longer visible to the contact.
+ *
+ * @param markAsSent True if the message should be marked as sent.
+ * If false it can be marked as sent by calling
+ * {@link #setMessagesSent(Transaction, ContactId, Collection, long)}.
+ */
+ @Nullable
+ Message getMessageToSend(Transaction txn, ContactId c, MessageId m,
+ long maxLatency, boolean markAsSent) throws DbException;
+
/**
* Returns the IDs of all messages that are eligible to be sent to the
- * given contact, together with their raw lengths. This may include
- * messages that have already been sent and are not yet due for
- * retransmission.
+ * given contact.
+ *
+ * Unlike {@link #getMessagesToSend(Transaction, ContactId, long, long)}
+ * this method may return messages that have already been sent and are
+ * not yet due for retransmission.
*
* Read-only.
*/
- Map getUnackedMessagesToSend(Transaction txn,
+ Collection getUnackedMessagesToSend(Transaction txn,
ContactId c) throws DbException;
/**
@@ -648,6 +684,13 @@ public interface DatabaseComponent extends TransactionManager {
void removeTransportKeys(Transaction txn, TransportId t, KeySetId k)
throws DbException;
+ /**
+ * Records an ack for the given messages as having been sent to the given
+ * contact.
+ */
+ void setAckSent(Transaction txn, ContactId c, Collection acked)
+ throws DbException;
+
/**
* Sets the cleanup timer duration for the given message. This does not
* start the message's cleanup timer.
@@ -694,6 +737,13 @@ public interface DatabaseComponent extends TransactionManager {
void setMessageState(Transaction txn, MessageId m, MessageState state)
throws DbException;
+ /**
+ * Records the given messages as having been sent to the given contact
+ * over a transport with the given maximum latency.
+ */
+ void setMessagesSent(Transaction txn, ContactId c,
+ Collection sent, long maxLatency) throws DbException;
+
/**
* Adds dependencies for a message
*/
diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseExecutor.java b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseExecutor.java
index 256be229a..b1fae6e05 100644
--- a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseExecutor.java
+++ b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseExecutor.java
@@ -18,6 +18,10 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
* submitted, tasks are not run concurrently, and submitting a task will never
* block. Tasks must not run indefinitely. Tasks submitted during shutdown are
* discarded.
+ *
+ * It is not mandatory to use this executor for database tasks. The database
+ * can be accessed from any thread, but this executor's guarantee that tasks
+ * are run in the order they're submitted may be useful in some cases.
*/
@Qualifier
@Target({FIELD, METHOD, PARAMETER})
diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/db/Transaction.java b/bramble-api/src/main/java/org/briarproject/bramble/api/db/Transaction.java
index e54da0064..cdb6e3980 100644
--- a/bramble-api/src/main/java/org/briarproject/bramble/api/db/Transaction.java
+++ b/bramble-api/src/main/java/org/briarproject/bramble/api/db/Transaction.java
@@ -45,6 +45,9 @@ public class Transaction {
/**
* Attaches an event to be broadcast when the transaction has been
* committed. The event will be broadcast on the {@link EventExecutor}.
+ * Events and {@link #attach(Runnable) tasks} are submitted to the
+ * {@link EventExecutor} in the order they were attached to the
+ * transaction.
*/
public void attach(Event e) {
if (actions == null) actions = new ArrayList<>();
@@ -54,6 +57,9 @@ public class Transaction {
/**
* Attaches a task to be executed when the transaction has been
* committed. The task will be run on the {@link EventExecutor}.
+ * {@link #attach(Event) Events} and tasks are submitted to the
+ * {@link EventExecutor} in the order they were attached to the
+ * transaction.
*/
public void attach(Runnable r) {
if (actions == null) actions = new ArrayList<>();
diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/db/TransactionManager.java b/bramble-api/src/main/java/org/briarproject/bramble/api/db/TransactionManager.java
index 6850c5b99..2ed25caf3 100644
--- a/bramble-api/src/main/java/org/briarproject/bramble/api/db/TransactionManager.java
+++ b/bramble-api/src/main/java/org/briarproject/bramble/api/db/TransactionManager.java
@@ -1,51 +1,95 @@
package org.briarproject.bramble.api.db;
+import org.briarproject.bramble.api.event.EventExecutor;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+/**
+ * An interface for managing database transactions.
+ *
+ * Read-only transactions may access the database concurrently. Read-write
+ * transactions access the database exclusively, so starting a read-only or
+ * read-write transaction will block until there are no read-write
+ * transactions in progress.
+ *
+ * Failing to {@link #endTransaction(Transaction) end} a transaction will
+ * prevent other callers from accessing the database, so it is recommended to
+ * use the {@link #transaction(boolean, DbRunnable)},
+ * {@link #transactionWithResult(boolean, DbCallable)} and
+ * {@link #transactionWithNullableResult(boolean, NullableDbCallable)} methods
+ * where possible, which handle committing or aborting the transaction on the
+ * caller's behalf.
+ *
+ * Transactions are not reentrant, i.e. it is not permitted to start a
+ * transaction on a thread that already has a transaction in progress.
+ */
+@ThreadSafe
@NotNullByDefault
public interface TransactionManager {
/**
- * Starts a new transaction and returns an object representing it.
- *
- * This method acquires locks, so it must not be called while holding a
- * lock.
+ * Starts a new transaction and returns an object representing it. This
+ * method acquires the database lock, which is held until
+ * {@link #endTransaction(Transaction)} is called.
*
- * @param readOnly true if the transaction will only be used for reading.
+ * @param readOnly True if the transaction will only be used for reading,
+ * in which case the database lock can be shared with other read-only
+ * transactions.
*/
Transaction startTransaction(boolean readOnly) throws DbException;
/**
* Commits a transaction to the database.
+ * {@link #endTransaction(Transaction)} must be called to release the
+ * database lock.
*/
void commitTransaction(Transaction txn) throws DbException;
/**
- * Ends a transaction. If the transaction has not been committed,
- * it will be aborted. If the transaction has been committed,
- * any events attached to the transaction are broadcast.
- * The database lock will be released in either case.
+ * Ends a transaction. If the transaction has not been committed by
+ * calling {@link #commitTransaction(Transaction)}, it is aborted and the
+ * database lock is released.
+ *
+ * If the transaction has been committed, any
+ * {@link Transaction#attach events} attached to the transaction are
+ * broadcast and any {@link Transaction#attach(Runnable) tasks} attached
+ * to the transaction are submitted to the {@link EventExecutor}. The
+ * database lock is then released.
*/
void endTransaction(Transaction txn);
/**
- * Runs the given task within a transaction.
+ * Runs the given task within a transaction. The database lock is held
+ * while running the task.
+ *
+ * @param readOnly True if the transaction will only be used for reading,
+ * in which case the database lock can be shared with other read-only
+ * transactions.
*/
void transaction(boolean readOnly,
DbRunnable task) throws DbException, E;
/**
* Runs the given task within a transaction and returns the result of the
- * task.
+ * task. The database lock is held while running the task.
+ *
+ * @param readOnly True if the transaction will only be used for reading,
+ * in which case the database lock can be shared with other read-only
+ * transactions.
*/
R transactionWithResult(boolean readOnly,
DbCallable task) throws DbException, E;
/**
* Runs the given task within a transaction and returns the result of the
- * task, which may be null.
+ * task, which may be null. The database lock is held while running the
+ * task.
+ *
+ * @param readOnly True if the transaction will only be used for reading,
+ * in which case the database lock can be shared with other read-only
+ * transactions.
*/
@Nullable
R transactionWithNullableResult(boolean readOnly,
diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/mailbox/MailboxConstants.java b/bramble-api/src/main/java/org/briarproject/bramble/api/mailbox/MailboxConstants.java
new file mode 100644
index 000000000..976e86c90
--- /dev/null
+++ b/bramble-api/src/main/java/org/briarproject/bramble/api/mailbox/MailboxConstants.java
@@ -0,0 +1,23 @@
+package org.briarproject.bramble.api.mailbox;
+
+import static org.briarproject.bramble.api.transport.TransportConstants.MAX_FRAME_LENGTH;
+import static org.briarproject.bramble.api.transport.TransportConstants.MAX_PAYLOAD_LENGTH;
+import static org.briarproject.bramble.api.transport.TransportConstants.STREAM_HEADER_LENGTH;
+import static org.briarproject.bramble.api.transport.TransportConstants.TAG_LENGTH;
+
+public interface MailboxConstants {
+
+ /**
+ * The maximum length of a file that can be uploaded to or downloaded from
+ * a mailbox.
+ */
+ int MAX_FILE_BYTES = 1024 * 1024;
+
+ /**
+ * The maximum length of the plaintext payload of a file, such that the
+ * ciphertext is no more than {@link #MAX_FILE_BYTES}.
+ */
+ int MAX_FILE_PAYLOAD_BYTES =
+ (MAX_FILE_BYTES - TAG_LENGTH - STREAM_HEADER_LENGTH)
+ / MAX_FRAME_LENGTH * MAX_PAYLOAD_LENGTH;
+}
diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/record/RecordWriter.java b/bramble-api/src/main/java/org/briarproject/bramble/api/record/RecordWriter.java
index eb83d4d41..893e5ff53 100644
--- a/bramble-api/src/main/java/org/briarproject/bramble/api/record/RecordWriter.java
+++ b/bramble-api/src/main/java/org/briarproject/bramble/api/record/RecordWriter.java
@@ -12,4 +12,6 @@ public interface RecordWriter {
void flush() throws IOException;
void close() throws IOException;
+
+ long getBytesWritten();
}
diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/DeferredSendHandler.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/DeferredSendHandler.java
new file mode 100644
index 000000000..1966b3bb6
--- /dev/null
+++ b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/DeferredSendHandler.java
@@ -0,0 +1,15 @@
+package org.briarproject.bramble.api.sync;
+
+import java.util.Collection;
+
+/**
+ * An interface for holding the IDs of messages sent and acked during an
+ * outgoing {@link SyncSession} so they can be recorded in the DB as sent
+ * or acked at some later time.
+ */
+public interface DeferredSendHandler {
+
+ void onAckSent(Collection acked);
+
+ void onMessageSent(MessageId sent);
+}
diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncRecordWriter.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncRecordWriter.java
index 75d4b8401..4234f50fb 100644
--- a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncRecordWriter.java
+++ b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncRecordWriter.java
@@ -20,4 +20,6 @@ public interface SyncRecordWriter {
void writePriority(Priority p) throws IOException;
void flush() throws IOException;
+
+ long getBytesWritten();
}
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java b/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java
index 921580049..e0aaaecc3 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java
@@ -406,6 +406,12 @@ interface Database {
Collection getMessageIds(T txn, GroupId g, Metadata query)
throws DbException;
+ /**
+ * Returns the length of the given message in bytes, including the
+ * message header.
+ */
+ int getMessageLength(T txn, MessageId m) throws DbException;
+
/**
* Returns the metadata for all delivered messages in the given group.
*
@@ -496,7 +502,8 @@ interface Database {
/**
* Returns the IDs of some messages that are eligible to be sent to the
- * given contact, up to the given total length.
+ * given contact. The total length of the messages including record headers
+ * will be no more than the given capacity.
*
* Unlike {@link #getUnackedMessagesToSend(Object, ContactId)} this method
* does not return messages that have already been sent unless they are
@@ -504,20 +511,20 @@ interface Database {
*
* Read-only.
*/
- Collection getMessagesToSend(T txn, ContactId c, int maxLength,
+ Collection getMessagesToSend(T txn, ContactId c, long capacity,
long maxLatency) throws DbException;
/**
* Returns the IDs of all messages that are eligible to be sent to the
- * given contact, together with their raw lengths.
+ * given contact.
*
- * Unlike {@link #getMessagesToSend(Object, ContactId, int, long)} this
+ * Unlike {@link #getMessagesToSend(Object, ContactId, long, long)} this
* method may return messages that have already been sent and are not yet
* due for retransmission.
*
* Read-only.
*/
- Map getUnackedMessagesToSend(T txn, ContactId c)
+ Collection getUnackedMessagesToSend(T txn, ContactId c)
throws DbException;
/**
@@ -598,13 +605,14 @@ interface Database {
/**
* Returns the IDs of some messages that are eligible to be sent to the
- * given contact and have been requested by the contact, up to the given
- * total length.
+ * given contact and have been requested by the contact. The total length
+ * of the messages including record headers will be no more than the given
+ * capacity.
*
* Read-only.
*/
Collection getRequestedMessagesToSend(T txn, ContactId c,
- int maxLength, long maxLatency) throws DbException;
+ long capacity, long maxLatency) throws DbException;
/**
* Returns all settings in the given namespace.
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java
index 7e4eded83..64b6fc3c3 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java
@@ -75,7 +75,6 @@ import org.briarproject.bramble.api.transport.TransportKeys;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
@@ -87,6 +86,7 @@ import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
+import static java.util.Collections.singletonList;
import static java.util.logging.Level.WARNING;
import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE;
@@ -424,13 +424,14 @@ class DatabaseComponentImpl implements DatabaseComponent {
@Nullable
@Override
public Collection generateBatch(Transaction transaction,
- ContactId c, int maxLength, long maxLatency) throws DbException {
+ ContactId c, long capacity, long maxLatency) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
Collection ids =
- db.getMessagesToSend(txn, c, maxLength, maxLatency);
+ db.getMessagesToSend(txn, c, capacity, maxLatency);
+ if (ids.isEmpty()) return null;
long totalLength = 0;
List messages = new ArrayList<>(ids.size());
for (MessageId m : ids) {
@@ -439,38 +440,11 @@ class DatabaseComponentImpl implements DatabaseComponent {
messages.add(message);
db.updateRetransmissionData(txn, c, m, maxLatency);
}
- if (ids.isEmpty()) return null;
db.lowerRequestedFlag(txn, c, ids);
transaction.attach(new MessagesSentEvent(c, ids, totalLength));
return messages;
}
- @Override
- public Collection generateBatch(Transaction transaction,
- ContactId c, Collection ids, long maxLatency)
- throws DbException {
- if (transaction.isReadOnly()) throw new IllegalArgumentException();
- T txn = unbox(transaction);
- if (!db.containsContact(txn, c))
- throw new NoSuchContactException();
- long totalLength = 0;
- List messages = new ArrayList<>(ids.size());
- List sentIds = new ArrayList<>(ids.size());
- for (MessageId m : ids) {
- if (db.containsVisibleMessage(txn, c, m)) {
- Message message = db.getMessage(txn, m);
- totalLength += message.getRawLength();
- messages.add(message);
- sentIds.add(m);
- db.updateRetransmissionData(txn, c, m, maxLatency);
- }
- }
- if (messages.isEmpty()) return messages;
- db.lowerRequestedFlag(txn, c, sentIds);
- transaction.attach(new MessagesSentEvent(c, sentIds, totalLength));
- return messages;
- }
-
@Nullable
@Override
public Offer generateOffer(Transaction transaction, ContactId c,
@@ -505,13 +479,14 @@ class DatabaseComponentImpl implements DatabaseComponent {
@Nullable
@Override
public Collection generateRequestedBatch(Transaction transaction,
- ContactId c, int maxLength, long maxLatency) throws DbException {
+ ContactId c, long capacity, long maxLatency) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
Collection ids =
- db.getRequestedMessagesToSend(txn, c, maxLength, maxLatency);
+ db.getRequestedMessagesToSend(txn, c, capacity, maxLatency);
+ if (ids.isEmpty()) return null;
long totalLength = 0;
List messages = new ArrayList<>(ids.size());
for (MessageId m : ids) {
@@ -520,7 +495,6 @@ class DatabaseComponentImpl implements DatabaseComponent {
messages.add(message);
db.updateRetransmissionData(txn, c, m, maxLatency);
}
- if (ids.isEmpty()) return null;
db.lowerRequestedFlag(txn, c, ids);
transaction.attach(new MessagesSentEvent(c, ids, totalLength));
return messages;
@@ -635,6 +609,24 @@ class DatabaseComponentImpl implements DatabaseComponent {
return db.getMessageIds(txn, g, query);
}
+ @Override
+ public Collection getMessagesToAck(Transaction transaction,
+ ContactId c, int maxMessages) throws DbException {
+ T txn = unbox(transaction);
+ if (!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ return db.getMessagesToAck(txn, c, maxMessages);
+ }
+
+ @Override
+ public Collection getMessagesToSend(Transaction transaction,
+ ContactId c, long capacity, long maxLatency) throws DbException {
+ T txn = unbox(transaction);
+ if (!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ return db.getMessagesToSend(txn, c, capacity, maxLatency);
+ }
+
@Override
public Collection getMessagesToValidate(Transaction transaction)
throws DbException {
@@ -740,10 +732,29 @@ class DatabaseComponentImpl implements DatabaseComponent {
return status;
}
+ @Nullable
@Override
- public Map getUnackedMessagesToSend(
- Transaction transaction,
- ContactId c) throws DbException {
+ public Message getMessageToSend(Transaction transaction, ContactId c,
+ MessageId m, long maxLatency, boolean markAsSent)
+ throws DbException {
+ if (transaction.isReadOnly()) throw new IllegalArgumentException();
+ T txn = unbox(transaction);
+ if (!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ if (!db.containsVisibleMessage(txn, c, m)) return null;
+ Message message = db.getMessage(txn, m);
+ if (markAsSent) {
+ db.updateRetransmissionData(txn, c, m, maxLatency);
+ db.lowerRequestedFlag(txn, c, singletonList(m));
+ transaction.attach(new MessagesSentEvent(c, singletonList(m),
+ message.getRawLength()));
+ }
+ return message;
+ }
+
+ @Override
+ public Collection getUnackedMessagesToSend(
+ Transaction transaction, ContactId c) throws DbException {
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
@@ -1069,6 +1080,20 @@ class DatabaseComponentImpl implements DatabaseComponent {
db.removeTransportKeys(txn, t, k);
}
+ @Override
+ public void setAckSent(Transaction transaction, ContactId c,
+ Collection acked) throws DbException {
+ if (transaction.isReadOnly()) throw new IllegalArgumentException();
+ T txn = unbox(transaction);
+ if (!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ List visible = new ArrayList<>(acked.size());
+ for (MessageId m : acked) {
+ if (db.containsVisibleMessage(txn, c, m)) visible.add(m);
+ }
+ db.lowerAckFlag(txn, c, visible);
+ }
+
@Override
public void setCleanupTimerDuration(Transaction transaction, MessageId m,
long duration) throws DbException {
@@ -1115,7 +1140,7 @@ class DatabaseComponentImpl implements DatabaseComponent {
if (old == INVISIBLE) db.addGroupVisibility(txn, c, g, v == SHARED);
else if (v == INVISIBLE) db.removeGroupVisibility(txn, c, g);
else db.setGroupVisibility(txn, c, g, v == SHARED);
- List affected = Collections.singletonList(c);
+ List affected = singletonList(c);
transaction.attach(new GroupVisibilityUpdatedEvent(affected));
}
@@ -1163,6 +1188,28 @@ class DatabaseComponentImpl implements DatabaseComponent {
transaction.attach(new MessageStateChangedEvent(m, false, state));
}
+ @Override
+ public void setMessagesSent(Transaction transaction, ContactId c,
+ Collection sent, long maxLatency) throws DbException {
+ if (transaction.isReadOnly()) throw new IllegalArgumentException();
+ T txn = unbox(transaction);
+ if (!db.containsContact(txn, c))
+ throw new NoSuchContactException();
+ long totalLength = 0;
+ List visible = new ArrayList<>(sent.size());
+ for (MessageId m : sent) {
+ if (db.containsVisibleMessage(txn, c, m)) {
+ visible.add(m);
+ totalLength += db.getMessageLength(txn, m);
+ db.updateRetransmissionData(txn, c, m, maxLatency);
+ }
+ }
+ db.lowerRequestedFlag(txn, c, visible);
+ if (!visible.isEmpty()) {
+ transaction.attach(new MessagesSentEvent(c, visible, totalLength));
+ }
+ }
+
@Override
public void addMessageDependencies(Transaction transaction,
Message dependent, Collection dependencies)
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java b/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java
index 0dfe41bd9..3e16e4223 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java
@@ -51,7 +51,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -77,6 +76,7 @@ import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.api.db.DatabaseComponent.NO_CLEANUP_DEADLINE;
import static org.briarproject.bramble.api.db.DatabaseComponent.TIMER_NOT_STARTED;
import static org.briarproject.bramble.api.db.Metadata.REMOVE;
+import static org.briarproject.bramble.api.record.Record.RECORD_HEADER_BYTES;
import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE;
import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED;
import static org.briarproject.bramble.api.sync.Group.Visibility.VISIBLE;
@@ -1908,6 +1908,31 @@ abstract class JdbcDatabase implements Database {
}
}
+ @Override
+ public int getMessageLength(Connection txn, MessageId m)
+ throws DbException {
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+ try {
+ String sql = "SELECT length from messages"
+ + " WHERE messageId = ? AND state = ?";
+ ps = txn.prepareStatement(sql);
+ ps.setBytes(1, m.getBytes());
+ ps.setInt(2, DELIVERED.getValue());
+ rs = ps.executeQuery();
+ if (!rs.next()) throw new DbStateException();
+ int length = rs.getInt(1);
+ if (rs.next()) throw new DbStateException();
+ rs.close();
+ ps.close();
+ return length;
+ } catch (SQLException e) {
+ tryToClose(rs, LOG, WARNING);
+ tryToClose(ps, LOG, WARNING);
+ throw new DbException(e);
+ }
+ }
+
@Override
public Map getMessageMetadata(Connection txn,
GroupId g) throws DbException {
@@ -2256,8 +2281,8 @@ abstract class JdbcDatabase implements Database {
}
@Override
- public Collection getMessagesToSend(Connection txn, ContactId c,
- int maxLength, long maxLatency) throws DbException {
+ public Collection getMessagesToSend(Connection txn,
+ ContactId c, long capacity, long maxLatency) throws DbException {
long now = clock.currentTimeMillis();
PreparedStatement ps = null;
ResultSet rs = null;
@@ -2277,12 +2302,11 @@ abstract class JdbcDatabase implements Database {
ps.setLong(4, maxLatency);
rs = ps.executeQuery();
List ids = new ArrayList<>();
- int total = 0;
while (rs.next()) {
int length = rs.getInt(1);
- if (total + length > maxLength) break;
+ if (capacity < RECORD_HEADER_BYTES + length) break;
ids.add(new MessageId(rs.getBytes(2)));
- total += length;
+ capacity -= RECORD_HEADER_BYTES + length;
}
rs.close();
ps.close();
@@ -2295,12 +2319,12 @@ abstract class JdbcDatabase implements Database {
}
@Override
- public Map getUnackedMessagesToSend(Connection txn,
+ public Collection getUnackedMessagesToSend(Connection txn,
ContactId c) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
- String sql = "SELECT length, messageId FROM statuses"
+ String sql = "SELECT messageId FROM statuses"
+ " WHERE contactId = ? AND state = ?"
+ " AND groupShared = TRUE AND messageShared = TRUE"
+ " AND deleted = FALSE AND seen = FALSE"
@@ -2309,15 +2333,11 @@ abstract class JdbcDatabase implements Database {
ps.setInt(1, c.getInt());
ps.setInt(2, DELIVERED.getValue());
rs = ps.executeQuery();
- Map results = new LinkedHashMap<>();
- while (rs.next()) {
- int length = rs.getInt(1);
- MessageId id = new MessageId(rs.getBytes(2));
- results.put(id, length);
- }
+ List ids = new ArrayList<>();
+ while (rs.next()) ids.add(new MessageId(rs.getBytes(1)));
rs.close();
ps.close();
- return results;
+ return ids;
} catch (SQLException e) {
tryToClose(rs, LOG, WARNING);
tryToClose(ps, LOG, WARNING);
@@ -2430,6 +2450,7 @@ abstract class JdbcDatabase implements Database {
MessageId m = new MessageId(rs.getBytes(1));
GroupId g = new GroupId(rs.getBytes(2));
Collection messageIds = ids.get(g);
+ //noinspection Java8MapApi
if (messageIds == null) {
messageIds = new ArrayList<>();
ids.put(g, messageIds);
@@ -2556,7 +2577,7 @@ abstract class JdbcDatabase implements Database {
@Override
public Collection getRequestedMessagesToSend(Connection txn,
- ContactId c, int maxLength, long maxLatency) throws DbException {
+ ContactId c, long capacity, long maxLatency) throws DbException {
long now = clock.currentTimeMillis();
PreparedStatement ps = null;
ResultSet rs = null;
@@ -2576,12 +2597,11 @@ abstract class JdbcDatabase implements Database {
ps.setLong(4, maxLatency);
rs = ps.executeQuery();
List ids = new ArrayList<>();
- int total = 0;
while (rs.next()) {
int length = rs.getInt(1);
- if (total + length > maxLength) break;
+ if (capacity < RECORD_HEADER_BYTES + length) break;
ids.add(new MessageId(rs.getBytes(2)));
- total += length;
+ capacity -= RECORD_HEADER_BYTES + length;
}
rs.close();
ps.close();
@@ -2735,6 +2755,7 @@ abstract class JdbcDatabase implements Database {
ContactId c = new ContactId(rs.getInt(1));
TransportId t = new TransportId(rs.getString(2));
Collection transportIds = ids.get(c);
+ //noinspection Java8MapApi
if (transportIds == null) {
transportIds = new ArrayList<>();
ids.put(c, transportIds);
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/record/RecordWriterImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/record/RecordWriterImpl.java
index ec9f5c0c0..137af59dc 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/record/RecordWriterImpl.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/record/RecordWriterImpl.java
@@ -19,6 +19,8 @@ class RecordWriterImpl implements RecordWriter {
private final OutputStream out;
private final byte[] header = new byte[RECORD_HEADER_BYTES];
+ private long bytesWritten = 0;
+
RecordWriterImpl(OutputStream out) {
this.out = out;
}
@@ -31,6 +33,7 @@ class RecordWriterImpl implements RecordWriter {
ByteUtils.writeUint16(payload.length, header, 2);
out.write(header);
out.write(payload);
+ bytesWritten += RECORD_HEADER_BYTES + payload.length;
}
@Override
@@ -42,4 +45,9 @@ class RecordWriterImpl implements RecordWriter {
public void close() throws IOException {
out.close();
}
+
+ @Override
+ public long getBytesWritten() {
+ return bytesWritten;
+ }
}
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java
index 92e287203..349b8d37b 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java
@@ -13,11 +13,13 @@ import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.plugin.event.TransportInactiveEvent;
+import org.briarproject.bramble.api.record.Record;
import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.Offer;
import org.briarproject.bramble.api.sync.Priority;
import org.briarproject.bramble.api.sync.Request;
+import org.briarproject.bramble.api.sync.SyncConstants;
import org.briarproject.bramble.api.sync.SyncRecordWriter;
import org.briarproject.bramble.api.sync.SyncSession;
import org.briarproject.bramble.api.sync.Versions;
@@ -47,8 +49,9 @@ import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.api.lifecycle.LifecycleManager.LifecycleState.STOPPING;
-import static org.briarproject.bramble.api.record.Record.MAX_RECORD_PAYLOAD_BYTES;
+import static org.briarproject.bramble.api.record.Record.RECORD_HEADER_BYTES;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS;
+import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_LENGTH;
import static org.briarproject.bramble.api.sync.SyncConstants.SUPPORTED_VERSIONS;
import static org.briarproject.bramble.util.LogUtils.logException;
@@ -71,6 +74,16 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
NEXT_SEND_TIME_DECREASED = () -> {
};
+ /**
+ * The batch capacity must be at least {@link Record#RECORD_HEADER_BYTES}
+ * + {@link SyncConstants#MAX_MESSAGE_LENGTH} to ensure that maximum-size
+ * messages can be selected for transmission. Larger batches will mean
+ * fewer round-trips between the DB and the output stream, but each
+ * round-trip will block the DB for longer.
+ */
+ private static final int BATCH_CAPACITY =
+ (RECORD_HEADER_BYTES + MAX_MESSAGE_LENGTH) * 2;
+
private final DatabaseComponent db;
private final Executor dbExecutor;
private final EventBus eventBus;
@@ -296,8 +309,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
db.transactionWithNullableResult(false, txn -> {
Collection batch =
db.generateRequestedBatch(txn, contactId,
- MAX_RECORD_PAYLOAD_BYTES,
- maxLatency);
+ BATCH_CAPACITY, maxLatency);
setNextSendTime(db.getNextSendTime(txn, contactId));
return batch;
});
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/EagerSimplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/EagerSimplexOutgoingSession.java
new file mode 100644
index 000000000..1cc62de9f
--- /dev/null
+++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/EagerSimplexOutgoingSession.java
@@ -0,0 +1,66 @@
+package org.briarproject.bramble.sync;
+
+import org.briarproject.bramble.api.contact.ContactId;
+import org.briarproject.bramble.api.db.DatabaseComponent;
+import org.briarproject.bramble.api.db.DbException;
+import org.briarproject.bramble.api.event.EventBus;
+import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
+import org.briarproject.bramble.api.plugin.TransportId;
+import org.briarproject.bramble.api.sync.Message;
+import org.briarproject.bramble.api.sync.MessageId;
+import org.briarproject.bramble.api.sync.SyncRecordWriter;
+import org.briarproject.bramble.api.transport.StreamWriter;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.logging.Logger;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import static java.util.logging.Level.INFO;
+import static java.util.logging.Logger.getLogger;
+
+/**
+ * A {@link SimplexOutgoingSession} that sends messages eagerly, ie
+ * regardless of whether they're due for retransmission.
+ */
+@ThreadSafe
+@NotNullByDefault
+class EagerSimplexOutgoingSession extends SimplexOutgoingSession {
+
+ private static final Logger LOG =
+ getLogger(EagerSimplexOutgoingSession.class.getName());
+
+ EagerSimplexOutgoingSession(DatabaseComponent db,
+ EventBus eventBus,
+ ContactId contactId,
+ TransportId transportId,
+ long maxLatency,
+ StreamWriter streamWriter,
+ SyncRecordWriter recordWriter) {
+ super(db, eventBus, contactId, transportId, maxLatency, streamWriter,
+ recordWriter);
+ }
+
+ @Override
+ void sendMessages() throws DbException, IOException {
+ for (MessageId m : loadUnackedMessageIdsToSend()) {
+ if (isInterrupted()) break;
+ Message message = db.transactionWithNullableResult(false, txn ->
+ db.getMessageToSend(txn, contactId, m, maxLatency, true));
+ if (message == null) continue; // No longer shared
+ recordWriter.writeMessage(message);
+ LOG.info("Sent message");
+ }
+ }
+
+ private Collection loadUnackedMessageIdsToSend()
+ throws DbException {
+ Collection ids = db.transactionWithResult(true, txn ->
+ db.getUnackedMessagesToSend(txn, contactId));
+ if (LOG.isLoggable(INFO)) {
+ LOG.info(ids.size() + " unacked messages to send");
+ }
+ return ids;
+ }
+}
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/MailboxOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/MailboxOutgoingSession.java
new file mode 100644
index 000000000..5d4e06fb4
--- /dev/null
+++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/MailboxOutgoingSession.java
@@ -0,0 +1,117 @@
+package org.briarproject.bramble.sync;
+
+import org.briarproject.bramble.api.contact.ContactId;
+import org.briarproject.bramble.api.db.DatabaseComponent;
+import org.briarproject.bramble.api.db.DbException;
+import org.briarproject.bramble.api.event.EventBus;
+import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
+import org.briarproject.bramble.api.plugin.TransportId;
+import org.briarproject.bramble.api.sync.Ack;
+import org.briarproject.bramble.api.sync.DeferredSendHandler;
+import org.briarproject.bramble.api.sync.Message;
+import org.briarproject.bramble.api.sync.MessageId;
+import org.briarproject.bramble.api.sync.SyncRecordWriter;
+import org.briarproject.bramble.api.transport.StreamWriter;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.logging.Logger;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import static java.lang.Math.min;
+import static java.util.Collections.emptyList;
+import static java.util.logging.Level.INFO;
+import static java.util.logging.Logger.getLogger;
+import static org.briarproject.bramble.api.record.Record.RECORD_HEADER_BYTES;
+import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS;
+import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
+
+/**
+ * A {@link SimplexOutgoingSession} for sending and acking messages via a
+ * mailbox. The session uses a {@link DeferredSendHandler} to record the IDs
+ * of the messages sent and acked during the session so that they can be
+ * recorded in the DB as sent or acked after the file has been successfully
+ * uploaded to the mailbox.
+ */
+@ThreadSafe
+@NotNullByDefault
+class MailboxOutgoingSession extends SimplexOutgoingSession {
+
+ private static final Logger LOG =
+ getLogger(MailboxOutgoingSession.class.getName());
+
+ private final DeferredSendHandler deferredSendHandler;
+ private final long initialCapacity;
+
+ MailboxOutgoingSession(DatabaseComponent db,
+ EventBus eventBus,
+ ContactId contactId,
+ TransportId transportId,
+ long maxLatency,
+ StreamWriter streamWriter,
+ SyncRecordWriter recordWriter,
+ DeferredSendHandler deferredSendHandler,
+ long capacity) {
+ super(db, eventBus, contactId, transportId, maxLatency, streamWriter,
+ recordWriter);
+ this.deferredSendHandler = deferredSendHandler;
+ this.initialCapacity = capacity;
+ }
+
+ @Override
+ void sendAcks() throws DbException, IOException {
+ while (!isInterrupted()) {
+ Collection idsToAck = loadMessageIdsToAck();
+ if (idsToAck.isEmpty()) break;
+ recordWriter.writeAck(new Ack(idsToAck));
+ deferredSendHandler.onAckSent(idsToAck);
+ LOG.info("Sent ack");
+ }
+ }
+
+ private Collection loadMessageIdsToAck() throws DbException {
+ long idCapacity = (getRemainingCapacity() - RECORD_HEADER_BYTES)
+ / MessageId.LENGTH;
+ if (idCapacity <= 0) return emptyList(); // Out of capacity
+ int maxMessageIds = (int) min(idCapacity, MAX_MESSAGE_IDS);
+ Collection ids = db.transactionWithResult(true, txn ->
+ db.getMessagesToAck(txn, contactId, maxMessageIds));
+ if (LOG.isLoggable(INFO)) {
+ LOG.info(ids.size() + " messages to ack");
+ }
+ return ids;
+ }
+
+ private long getRemainingCapacity() {
+ return initialCapacity - recordWriter.getBytesWritten();
+ }
+
+ @Override
+ void sendMessages() throws DbException, IOException {
+ for (MessageId m : loadMessageIdsToSend()) {
+ if (isInterrupted()) break;
+ // Defer marking the message as sent
+ Message message = db.transactionWithNullableResult(true, txn ->
+ db.getMessageToSend(txn, contactId, m, maxLatency, false));
+ if (message == null) continue; // No longer shared
+ recordWriter.writeMessage(message);
+ deferredSendHandler.onMessageSent(m);
+ LOG.info("Sent message");
+ }
+ }
+
+ private Collection loadMessageIdsToSend() throws DbException {
+ long capacity = getRemainingCapacity();
+ if (capacity < RECORD_HEADER_BYTES + MESSAGE_HEADER_LENGTH) {
+ return emptyList(); // Out of capacity
+ }
+ Collection ids = db.transactionWithResult(true, txn ->
+ db.getMessagesToSend(txn, contactId, capacity, maxLatency));
+ if (LOG.isLoggable(INFO)) {
+ LOG.info(ids.size() + " messages to send");
+ }
+ return ids;
+ }
+
+}
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java
index 90c6be0b4..a9efdcabe 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java
@@ -3,7 +3,6 @@ package org.briarproject.bramble.sync;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.contact.event.ContactRemovedEvent;
import org.briarproject.bramble.api.db.DatabaseComponent;
-import org.briarproject.bramble.api.db.DatabaseExecutor;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.event.Event;
import org.briarproject.bramble.api.event.EventBus;
@@ -13,9 +12,10 @@ import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.plugin.event.TransportInactiveEvent;
+import org.briarproject.bramble.api.record.Record;
import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.Message;
-import org.briarproject.bramble.api.sync.MessageId;
+import org.briarproject.bramble.api.sync.SyncConstants;
import org.briarproject.bramble.api.sync.SyncRecordWriter;
import org.briarproject.bramble.api.sync.SyncSession;
import org.briarproject.bramble.api.sync.Versions;
@@ -23,15 +23,7 @@ import org.briarproject.bramble.api.sync.event.CloseSyncConnectionsEvent;
import org.briarproject.bramble.api.transport.StreamWriter;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import javax.annotation.concurrent.ThreadSafe;
@@ -40,8 +32,9 @@ import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.api.lifecycle.LifecycleManager.LifecycleState.STOPPING;
-import static org.briarproject.bramble.api.record.Record.MAX_RECORD_PAYLOAD_BYTES;
+import static org.briarproject.bramble.api.record.Record.RECORD_HEADER_BYTES;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS;
+import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_LENGTH;
import static org.briarproject.bramble.api.sync.SyncConstants.SUPPORTED_VERSIONS;
import static org.briarproject.bramble.util.LogUtils.logException;
@@ -57,38 +50,40 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
private static final Logger LOG =
getLogger(SimplexOutgoingSession.class.getName());
- private static final ThrowingRunnable CLOSE = () -> {
- };
+ /**
+ * The batch capacity must be at least {@link Record#RECORD_HEADER_BYTES}
+ * + {@link SyncConstants#MAX_MESSAGE_LENGTH} to ensure that maximum-size
+ * messages can be selected for transmission. Larger batches will mean
+ * fewer round-trips between the DB and the output stream, but each
+ * round-trip will block the DB for longer.
+ */
+ static final int BATCH_CAPACITY =
+ (RECORD_HEADER_BYTES + MAX_MESSAGE_LENGTH) * 2;
- private final DatabaseComponent db;
- private final Executor dbExecutor;
- private final EventBus eventBus;
- private final ContactId contactId;
- private final TransportId transportId;
- private final long maxLatency;
- private final boolean eager;
- private final StreamWriter streamWriter;
- private final SyncRecordWriter recordWriter;
- private final AtomicInteger outstandingQueries;
- private final BlockingQueue> writerTasks;
+ protected final DatabaseComponent db;
+ protected final EventBus eventBus;
+ protected final ContactId contactId;
+ protected final TransportId transportId;
+ protected final long maxLatency;
+ protected final StreamWriter streamWriter;
+ protected final SyncRecordWriter recordWriter;
private volatile boolean interrupted = false;
- SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
- EventBus eventBus, ContactId contactId, TransportId transportId,
- long maxLatency, boolean eager, StreamWriter streamWriter,
+ SimplexOutgoingSession(DatabaseComponent db,
+ EventBus eventBus,
+ ContactId contactId,
+ TransportId transportId,
+ long maxLatency,
+ StreamWriter streamWriter,
SyncRecordWriter recordWriter) {
this.db = db;
- this.dbExecutor = dbExecutor;
this.eventBus = eventBus;
this.contactId = contactId;
this.transportId = transportId;
this.maxLatency = maxLatency;
- this.eager = eager;
this.streamWriter = streamWriter;
this.recordWriter = recordWriter;
- outstandingQueries = new AtomicInteger(2); // One per type of record
- writerTasks = new LinkedBlockingQueue<>();
}
@IoExecutor
@@ -98,22 +93,13 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
try {
// Send our supported protocol versions
recordWriter.writeVersions(new Versions(SUPPORTED_VERSIONS));
- // Start a query for each type of record
- dbExecutor.execute(this::generateAck);
- if (eager) dbExecutor.execute(this::loadUnackedMessageIds);
- else dbExecutor.execute(this::generateBatch);
- // Write records until interrupted or no more records to write
try {
- while (!interrupted) {
- ThrowingRunnable task = writerTasks.take();
- if (task == CLOSE) break;
- task.run();
- }
- streamWriter.sendEndOfStream();
- } catch (InterruptedException e) {
- LOG.info("Interrupted while waiting for a record to write");
- Thread.currentThread().interrupt();
+ sendAcks();
+ sendMessages();
+ } catch (DbException e) {
+ logException(LOG, WARNING, e);
}
+ streamWriter.sendEndOfStream();
} finally {
eventBus.removeListener(this);
}
@@ -122,11 +108,10 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
@Override
public void interrupt() {
interrupted = true;
- writerTasks.add(CLOSE);
}
- private void decrementOutstandingQueries() {
- if (outstandingQueries.decrementAndGet() == 0) writerTasks.add(CLOSE);
+ boolean isInterrupted() {
+ return interrupted;
}
@Override
@@ -146,110 +131,33 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
}
}
- @DatabaseExecutor
- private void loadUnackedMessageIds() {
- if (interrupted) return;
- try {
- Map ids = db.transactionWithResult(true, txn ->
- db.getUnackedMessagesToSend(txn, contactId));
- if (LOG.isLoggable(INFO)) {
- LOG.info(ids.size() + " unacked messages to send");
- }
- if (ids.isEmpty()) decrementOutstandingQueries();
- else dbExecutor.execute(() -> generateEagerBatch(ids));
- } catch (DbException e) {
- logException(LOG, WARNING, e);
- interrupt();
- }
+ void sendAcks() throws DbException, IOException {
+ while (!isInterrupted()) if (!generateAndSendAck()) break;
}
- @DatabaseExecutor
- private void generateEagerBatch(Map ids) {
- if (interrupted) return;
- // Take some message IDs from `ids` to form a batch
- Collection batchIds = new ArrayList<>();
- long totalLength = 0;
- Iterator> it = ids.entrySet().iterator();
- while (it.hasNext()) {
- // Check whether the next message will fit in the batch
- Entry e = it.next();
- int length = e.getValue();
- if (totalLength + length > MAX_RECORD_PAYLOAD_BYTES) break;
- // Add the message to the batch
- it.remove();
- batchIds.add(e.getKey());
- totalLength += length;
- }
- if (batchIds.isEmpty()) throw new AssertionError();
- try {
- Collection batch =
- db.transactionWithResult(false, txn ->
- db.generateBatch(txn, contactId, batchIds,
- maxLatency));
- writerTasks.add(() -> writeEagerBatch(batch, ids));
- } catch (DbException e) {
- logException(LOG, WARNING, e);
- interrupt();
- }
- }
-
- @IoExecutor
- private void writeEagerBatch(Collection batch,
- Map ids) throws IOException {
- if (interrupted) return;
- for (Message m : batch) recordWriter.writeMessage(m);
- LOG.info("Sent eager batch");
- if (ids.isEmpty()) decrementOutstandingQueries();
- else dbExecutor.execute(() -> generateEagerBatch(ids));
- }
-
- @DatabaseExecutor
- private void generateAck() {
- if (interrupted) return;
- try {
- Ack a = db.transactionWithNullableResult(false, txn ->
- db.generateAck(txn, contactId, MAX_MESSAGE_IDS));
- if (LOG.isLoggable(INFO))
- LOG.info("Generated ack: " + (a != null));
- if (a == null) decrementOutstandingQueries();
- else writerTasks.add(() -> writeAck(a));
- } catch (DbException e) {
- logException(LOG, WARNING, e);
- interrupt();
- }
- }
-
- @IoExecutor
- private void writeAck(Ack ack) throws IOException {
- if (interrupted) return;
- recordWriter.writeAck(ack);
+ private boolean generateAndSendAck() throws DbException, IOException {
+ Ack a = db.transactionWithNullableResult(false, txn ->
+ db.generateAck(txn, contactId, MAX_MESSAGE_IDS));
+ if (LOG.isLoggable(INFO))
+ LOG.info("Generated ack: " + (a != null));
+ if (a == null) return false; // No more acks to send
+ recordWriter.writeAck(a);
LOG.info("Sent ack");
- dbExecutor.execute(this::generateAck);
+ return true;
}
- @DatabaseExecutor
- private void generateBatch() {
- if (interrupted) return;
- try {
- Collection b =
- db.transactionWithNullableResult(false, txn ->
- db.generateBatch(txn, contactId,
- MAX_RECORD_PAYLOAD_BYTES, maxLatency));
- if (LOG.isLoggable(INFO))
- LOG.info("Generated batch: " + (b != null));
- if (b == null) decrementOutstandingQueries();
- else writerTasks.add(() -> writeBatch(b));
- } catch (DbException e) {
- logException(LOG, WARNING, e);
- interrupt();
- }
+ void sendMessages() throws DbException, IOException {
+ while (!isInterrupted()) if (!generateAndSendBatch()) break;
}
- @IoExecutor
- private void writeBatch(Collection batch) throws IOException {
- if (interrupted) return;
- for (Message m : batch) recordWriter.writeMessage(m);
+ private boolean generateAndSendBatch() throws DbException, IOException {
+ Collection b = db.transactionWithNullableResult(false, txn ->
+ db.generateBatch(txn, contactId, BATCH_CAPACITY, maxLatency));
+ if (LOG.isLoggable(INFO))
+ LOG.info("Generated batch: " + (b != null));
+ if (b == null) return false; // No more messages to send
+ for (Message m : b) recordWriter.writeMessage(m);
LOG.info("Sent batch");
- dbExecutor.execute(this::generateBatch);
+ return true;
}
}
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncRecordWriterImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncRecordWriterImpl.java
index d2b4e73f1..38964b640 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncRecordWriterImpl.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncRecordWriterImpl.java
@@ -85,4 +85,9 @@ class SyncRecordWriterImpl implements SyncRecordWriter {
public void flush() throws IOException {
writer.flush();
}
+
+ @Override
+ public long getBytesWritten() {
+ return writer.getBytesWritten();
+ }
}
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java
index 70fd0df8b..fef6d7e45 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java
@@ -64,8 +64,13 @@ class SyncSessionFactoryImpl implements SyncSessionFactory {
OutputStream out = streamWriter.getOutputStream();
SyncRecordWriter recordWriter =
recordWriterFactory.createRecordWriter(out);
- return new SimplexOutgoingSession(db, dbExecutor, eventBus, c, t,
- maxLatency, eager, streamWriter, recordWriter);
+ if (eager) {
+ return new EagerSimplexOutgoingSession(db, eventBus, c, t,
+ maxLatency, streamWriter, recordWriter);
+ } else {
+ return new SimplexOutgoingSession(db, eventBus, c, t,
+ maxLatency, streamWriter, recordWriter);
+ }
}
@Override
diff --git a/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java b/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java
index 3af7a791a..e288e5507 100644
--- a/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java
+++ b/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java
@@ -72,6 +72,7 @@ import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.HOURS;
import static org.briarproject.bramble.api.db.DatabaseComponent.TIMER_NOT_STARTED;
+import static org.briarproject.bramble.api.record.Record.RECORD_HEADER_BYTES;
import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE;
import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED;
import static org.briarproject.bramble.api.sync.Group.Visibility.VISIBLE;
@@ -94,11 +95,15 @@ import static org.briarproject.bramble.test.TestUtils.getTransportId;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class DatabaseComponentImplTest extends BrambleMockTestCase {
+ private static final int BATCH_CAPACITY =
+ (RECORD_HEADER_BYTES + MAX_MESSAGE_LENGTH) * 2;
+
@SuppressWarnings("unchecked")
private final Database