diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/connection/ConnectionManager.java b/bramble-api/src/main/java/org/briarproject/bramble/api/connection/ConnectionManager.java index 3d7df9bea..63a9b36ae 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/connection/ConnectionManager.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/connection/ConnectionManager.java @@ -7,6 +7,7 @@ import org.briarproject.bramble.api.plugin.TransportConnectionReader; import org.briarproject.bramble.api.plugin.TransportConnectionWriter; import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; +import org.briarproject.bramble.api.sync.OutgoingSessionRecord; @NotNullByDefault public interface ConnectionManager { @@ -45,6 +46,14 @@ public interface ConnectionManager { void manageOutgoingConnection(ContactId c, TransportId t, TransportConnectionWriter w); + /** + * Manages an outgoing connection to a contact via a mailbox. The IDs of + * any messages sent or acked are added to the given + * {@link OutgoingSessionRecord}. + */ + void manageOutgoingConnection(ContactId c, TransportId t, + TransportConnectionWriter w, OutgoingSessionRecord sessionRecord); + /** * Manages an outgoing connection to a contact over a duplex transport. */ diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java index 3869e71a6..a934fa6c8 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java @@ -126,16 +126,11 @@ public interface DatabaseComponent extends TransactionManager { TransportKeys k) throws DbException; /** - * Returns true if there are any acks or messages to send to the given - * contact over a transport with the given maximum latency. + * Returns true if there are any acks to send to the given contact. *

* Read-only. - * - * @param eager True if messages that are not yet due for retransmission - * should be included */ - boolean containsAnythingToSend(Transaction txn, ContactId c, - long maxLatency, boolean eager) throws DbException; + boolean containsAcksToSend(Transaction txn, ContactId c) throws DbException; /** * Returns true if the database contains the given contact for the given @@ -161,6 +156,18 @@ public interface DatabaseComponent extends TransactionManager { */ boolean containsIdentity(Transaction txn, AuthorId a) throws DbException; + /** + * Returns true if there are any messages to send to the given contact + * over a transport with the given maximum latency. + *

+ * Read-only. + * + * @param eager True if messages that are not yet due for retransmission + * should be included + */ + boolean containsMessagesToSend(Transaction txn, ContactId c, + long maxLatency, boolean eager) throws DbException; + /** * Returns true if the database contains the given pending contact. *

diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/mailbox/MailboxConstants.java b/bramble-api/src/main/java/org/briarproject/bramble/api/mailbox/MailboxConstants.java index 5a54e7f44..02c7b8a5e 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/mailbox/MailboxConstants.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/mailbox/MailboxConstants.java @@ -5,6 +5,7 @@ import org.briarproject.bramble.api.plugin.TransportId; import java.util.List; import static java.util.Collections.singletonList; +import static java.util.concurrent.TimeUnit.DAYS; import static java.util.concurrent.TimeUnit.HOURS; import static org.briarproject.bramble.api.transport.TransportConstants.MAX_FRAME_LENGTH; import static org.briarproject.bramble.api.transport.TransportConstants.MAX_PAYLOAD_LENGTH; @@ -65,4 +66,8 @@ public interface MailboxConstants { */ long PROBLEM_MS_SINCE_LAST_SUCCESS = HOURS.toMillis(1); + /** + * The maximum latency of the mailbox transport in milliseconds. + */ + long MAX_LATENCY = DAYS.toMillis(14); } diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/DeferredSendHandler.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/DeferredSendHandler.java deleted file mode 100644 index 1966b3bb6..000000000 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/DeferredSendHandler.java +++ /dev/null @@ -1,15 +0,0 @@ -package org.briarproject.bramble.api.sync; - -import java.util.Collection; - -/** - * An interface for holding the IDs of messages sent and acked during an - * outgoing {@link SyncSession} so they can be recorded in the DB as sent - * or acked at some later time. - */ -public interface DeferredSendHandler { - - void onAckSent(Collection acked); - - void onMessageSent(MessageId sent); -} diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/OutgoingSessionRecord.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/OutgoingSessionRecord.java new file mode 100644 index 000000000..6f1077a23 --- /dev/null +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/OutgoingSessionRecord.java @@ -0,0 +1,37 @@ +package org.briarproject.bramble.api.sync; + +import org.briarproject.bramble.api.nullsafety.NotNullByDefault; + +import java.util.Collection; +import java.util.concurrent.CopyOnWriteArrayList; + +import javax.annotation.concurrent.ThreadSafe; + +/** + * A container for holding the IDs of messages sent and acked during an + * outgoing {@link SyncSession}, so they can be recorded in the DB as sent + * or acked at some later time. + */ +@ThreadSafe +@NotNullByDefault +public class OutgoingSessionRecord { + + private final Collection ackedIds = new CopyOnWriteArrayList<>(); + private final Collection sentIds = new CopyOnWriteArrayList<>(); + + public void onAckSent(Collection acked) { + ackedIds.addAll(acked); + } + + public void onMessageSent(MessageId sent) { + sentIds.add(sent); + } + + public Collection getAckedIds() { + return ackedIds; + } + + public Collection getSentIds() { + return sentIds; + } +} diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java index bb54f58e4..65250938f 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java @@ -12,12 +12,30 @@ import javax.annotation.Nullable; @NotNullByDefault public interface SyncSessionFactory { + /** + * Creates a session for receiving data from a contact. + */ SyncSession createIncomingSession(ContactId c, InputStream in, PriorityHandler handler); + /** + * Creates a session for sending data to a contact over a simplex transport. + * + * @param eager True if messages should be sent eagerly, ie regardless of + * whether they're due for retransmission. + */ SyncSession createSimplexOutgoingSession(ContactId c, TransportId t, long maxLatency, boolean eager, StreamWriter streamWriter); + /** + * Creates a session for sending data to a contact via a mailbox. The IDs + * of any messages sent or acked will be added to the given + * {@link OutgoingSessionRecord}. + */ + SyncSession createSimplexOutgoingSession(ContactId c, TransportId t, + long maxLatency, StreamWriter streamWriter, + OutgoingSessionRecord sessionRecord); + SyncSession createDuplexOutgoingSession(ContactId c, TransportId t, long maxLatency, int maxIdleTime, StreamWriter streamWriter, @Nullable Priority priority); diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/event/GroupVisibilityUpdatedEvent.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/event/GroupVisibilityUpdatedEvent.java index 1bcca2c66..7d151a48d 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/event/GroupVisibilityUpdatedEvent.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/event/GroupVisibilityUpdatedEvent.java @@ -3,6 +3,7 @@ package org.briarproject.bramble.api.sync.event; import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.event.Event; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.sync.Group.Visibility; import java.util.Collection; @@ -15,12 +16,19 @@ import javax.annotation.concurrent.Immutable; @NotNullByDefault public class GroupVisibilityUpdatedEvent extends Event { + private final Visibility visibility; private final Collection affected; - public GroupVisibilityUpdatedEvent(Collection affected) { + public GroupVisibilityUpdatedEvent(Visibility visibility, + Collection affected) { + this.visibility = visibility; this.affected = affected; } + public Visibility getVisibility() { + return visibility; + } + /** * Returns the contacts affected by the update. */ diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionManagerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionManagerImpl.java index 2e37ce7cd..f3224e5db 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionManagerImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionManagerImpl.java @@ -13,6 +13,7 @@ import org.briarproject.bramble.api.plugin.TransportConnectionWriter; import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; import org.briarproject.bramble.api.properties.TransportPropertyManager; +import org.briarproject.bramble.api.sync.OutgoingSessionRecord; import org.briarproject.bramble.api.sync.SyncSessionFactory; import org.briarproject.bramble.api.transport.KeyManager; import org.briarproject.bramble.api.transport.StreamReaderFactory; @@ -100,7 +101,16 @@ class ConnectionManagerImpl implements ConnectionManager { TransportConnectionWriter w) { ioExecutor.execute(new OutgoingSimplexSyncConnection(keyManager, connectionRegistry, streamReaderFactory, streamWriterFactory, - syncSessionFactory, transportPropertyManager, c, t, w)); + syncSessionFactory, transportPropertyManager, c, t, w, null)); + } + + @Override + public void manageOutgoingConnection(ContactId c, TransportId t, + TransportConnectionWriter w, OutgoingSessionRecord sessionRecord) { + ioExecutor.execute(new OutgoingSimplexSyncConnection(keyManager, + connectionRegistry, streamReaderFactory, streamWriterFactory, + syncSessionFactory, transportPropertyManager, c, t, w, + sessionRecord)); } @Override diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingSimplexSyncConnection.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingSimplexSyncConnection.java index 9bec08193..d128d9d71 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingSimplexSyncConnection.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingSimplexSyncConnection.java @@ -6,6 +6,7 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.TransportConnectionWriter; import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.properties.TransportPropertyManager; +import org.briarproject.bramble.api.sync.OutgoingSessionRecord; import org.briarproject.bramble.api.sync.SyncSession; import org.briarproject.bramble.api.sync.SyncSessionFactory; import org.briarproject.bramble.api.transport.KeyManager; @@ -16,6 +17,8 @@ import org.briarproject.bramble.api.transport.StreamWriterFactory; import java.io.IOException; +import javax.annotation.Nullable; + import static java.util.logging.Level.WARNING; import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull; import static org.briarproject.bramble.util.LogUtils.logException; @@ -26,6 +29,8 @@ class OutgoingSimplexSyncConnection extends SyncConnection implements Runnable { private final ContactId contactId; private final TransportId transportId; private final TransportConnectionWriter writer; + @Nullable + private final OutgoingSessionRecord sessionRecord; OutgoingSimplexSyncConnection(KeyManager keyManager, ConnectionRegistry connectionRegistry, @@ -34,13 +39,15 @@ class OutgoingSimplexSyncConnection extends SyncConnection implements Runnable { SyncSessionFactory syncSessionFactory, TransportPropertyManager transportPropertyManager, ContactId contactId, TransportId transportId, - TransportConnectionWriter writer) { + TransportConnectionWriter writer, + @Nullable OutgoingSessionRecord sessionRecord) { super(keyManager, connectionRegistry, streamReaderFactory, streamWriterFactory, syncSessionFactory, transportPropertyManager); this.contactId = contactId; this.transportId = transportId; this.writer = writer; + this.sessionRecord = sessionRecord; } @Override @@ -71,10 +78,16 @@ class OutgoingSimplexSyncConnection extends SyncConnection implements Runnable { StreamWriter streamWriter = streamWriterFactory.createStreamWriter( w.getOutputStream(), ctx); ContactId c = requireNonNull(ctx.getContactId()); - // Use eager retransmission if the transport is lossy and cheap - return syncSessionFactory.createSimplexOutgoingSession(c, - ctx.getTransportId(), w.getMaxLatency(), w.isLossyAndCheap(), - streamWriter); + if (sessionRecord == null) { + // Use eager retransmission if the transport is lossy and cheap + return syncSessionFactory.createSimplexOutgoingSession(c, + ctx.getTransportId(), w.getMaxLatency(), + w.isLossyAndCheap(), streamWriter); + } else { + return syncSessionFactory.createSimplexOutgoingSession(c, + ctx.getTransportId(), w.getMaxLatency(), streamWriter, + sessionRecord); + } } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java b/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java index e0aaaecc3..1e6f4ab31 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/Database.java @@ -163,16 +163,11 @@ interface Database { throws DbException; /** - * Returns true if there are any acks or messages to send to the given - * contact over a transport with the given maximum latency. + * Returns true if there are any acks to send to the given contact. *

* Read-only. - * - * @param eager True if messages that are not yet due for retransmission - * should be included */ - boolean containsAnythingToSend(T txn, ContactId c, long maxLatency, - boolean eager) throws DbException; + boolean containsAcksToSend(T txn, ContactId c) throws DbException; /** * Returns true if the database contains the given contact for the given @@ -212,6 +207,18 @@ interface Database { */ boolean containsMessage(T txn, MessageId m) throws DbException; + /** + * Returns true if there are any messages to send to the given + * contact over a transport with the given maximum latency. + *

+ * Read-only. + * + * @param eager True if messages that are not yet due for retransmission + * should be included + */ + boolean containsMessagesToSend(T txn, ContactId c, long maxLatency, + boolean eager) throws DbException; + /** * Returns true if the database contains the given pending contact. *

diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java index 64b6fc3c3..14b033b41 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java @@ -342,12 +342,12 @@ class DatabaseComponentImpl implements DatabaseComponent { } @Override - public boolean containsAnythingToSend(Transaction transaction, ContactId c, - long maxLatency, boolean eager) throws DbException { + public boolean containsAcksToSend(Transaction transaction, ContactId c) + throws DbException { T txn = unbox(transaction); if (!db.containsContact(txn, c)) throw new NoSuchContactException(); - return db.containsAnythingToSend(txn, c, maxLatency, eager); + return db.containsAcksToSend(txn, c); } @Override @@ -373,6 +373,15 @@ class DatabaseComponentImpl implements DatabaseComponent { return db.containsIdentity(txn, a); } + @Override + public boolean containsMessagesToSend(Transaction transaction, ContactId c, + long maxLatency, boolean eager) throws DbException { + T txn = unbox(transaction); + if (!db.containsContact(txn, c)) + throw new NoSuchContactException(); + return db.containsMessagesToSend(txn, c, maxLatency, eager); + } + @Override public boolean containsPendingContact(Transaction transaction, PendingContactId p) throws DbException { @@ -1016,7 +1025,8 @@ class DatabaseComponentImpl implements DatabaseComponent { db.getGroupVisibility(txn, id).keySet(); db.removeGroup(txn, id); transaction.attach(new GroupRemovedEvent(g)); - transaction.attach(new GroupVisibilityUpdatedEvent(affected)); + transaction.attach(new GroupVisibilityUpdatedEvent(INVISIBLE, + affected)); } @Override @@ -1141,7 +1151,7 @@ class DatabaseComponentImpl implements DatabaseComponent { else if (v == INVISIBLE) db.removeGroupVisibility(txn, c, g); else db.setGroupVisibility(txn, c, g, v == SHARED); List affected = singletonList(c); - transaction.attach(new GroupVisibilityUpdatedEvent(affected)); + transaction.attach(new GroupVisibilityUpdatedEvent(v, affected)); } @Override diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java b/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java index 3c4093e8d..bf23fd683 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/JdbcDatabase.java @@ -1147,8 +1147,8 @@ abstract class JdbcDatabase implements Database { } @Override - public boolean containsAnythingToSend(Connection txn, ContactId c, - long maxLatency, boolean eager) throws DbException { + public boolean containsAcksToSend(Connection txn, ContactId c) + throws DbException { PreparedStatement ps = null; ResultSet rs = null; try { @@ -1160,34 +1160,7 @@ abstract class JdbcDatabase implements Database { boolean acksToSend = rs.next(); rs.close(); ps.close(); - if (acksToSend) return true; - if (eager) { - sql = "SELECT NULL from statuses" - + " WHERE contactId = ? AND state = ?" - + " AND groupShared = TRUE AND messageShared = TRUE" - + " AND deleted = FALSE AND seen = FALSE"; - ps = txn.prepareStatement(sql); - ps.setInt(1, c.getInt()); - ps.setInt(2, DELIVERED.getValue()); - } else { - long now = clock.currentTimeMillis(); - sql = "SELECT NULL FROM statuses" - + " WHERE contactId = ? AND state = ?" - + " AND groupShared = TRUE AND messageShared = TRUE" - + " AND deleted = FALSE AND seen = FALSE" - + " AND (expiry <= ? OR maxLatency IS NULL" - + " OR ? < maxLatency)"; - ps = txn.prepareStatement(sql); - ps.setInt(1, c.getInt()); - ps.setInt(2, DELIVERED.getValue()); - ps.setLong(3, now); - ps.setLong(4, maxLatency); - } - rs = ps.executeQuery(); - boolean messagesToSend = rs.next(); - rs.close(); - ps.close(); - return messagesToSend; + return acksToSend; } catch (SQLException e) { tryToClose(rs, LOG, WARNING); tryToClose(ps, LOG, WARNING); @@ -1307,6 +1280,46 @@ abstract class JdbcDatabase implements Database { } } + @Override + public boolean containsMessagesToSend(Connection txn, ContactId c, + long maxLatency, boolean eager) throws DbException { + PreparedStatement ps = null; + ResultSet rs = null; + try { + if (eager) { + String sql = "SELECT NULL from statuses" + + " WHERE contactId = ? AND state = ?" + + " AND groupShared = TRUE AND messageShared = TRUE" + + " AND deleted = FALSE AND seen = FALSE"; + ps = txn.prepareStatement(sql); + ps.setInt(1, c.getInt()); + ps.setInt(2, DELIVERED.getValue()); + } else { + long now = clock.currentTimeMillis(); + String sql = "SELECT NULL FROM statuses" + + " WHERE contactId = ? AND state = ?" + + " AND groupShared = TRUE AND messageShared = TRUE" + + " AND deleted = FALSE AND seen = FALSE" + + " AND (expiry <= ? OR maxLatency IS NULL" + + " OR ? < maxLatency)"; + ps = txn.prepareStatement(sql); + ps.setInt(1, c.getInt()); + ps.setInt(2, DELIVERED.getValue()); + ps.setLong(3, now); + ps.setLong(4, maxLatency); + } + rs = ps.executeQuery(); + boolean messagesToSend = rs.next(); + rs.close(); + ps.close(); + return messagesToSend; + } catch (SQLException e) { + tryToClose(rs, LOG, WARNING); + tryToClose(ps, LOG, WARNING); + throw new DbException(e); + } + } + @Override public boolean containsPendingContact(Connection txn, PendingContactId p) throws DbException { diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/MailboxPluginFactory.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/MailboxPluginFactory.java index 98bf72441..351368c84 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/MailboxPluginFactory.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/MailboxPluginFactory.java @@ -9,14 +9,12 @@ import org.briarproject.bramble.api.plugin.simplex.SimplexPluginFactory; import javax.annotation.Nullable; import javax.inject.Inject; -import static java.util.concurrent.TimeUnit.DAYS; import static org.briarproject.bramble.api.mailbox.MailboxConstants.ID; +import static org.briarproject.bramble.api.mailbox.MailboxConstants.MAX_LATENCY; @NotNullByDefault public class MailboxPluginFactory implements SimplexPluginFactory { - private static final long MAX_LATENCY = DAYS.toMillis(14); - @Inject MailboxPluginFactory() { } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveManagerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveManagerImpl.java index 386463dd7..c7a250d60 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveManagerImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/file/RemovableDriveManagerImpl.java @@ -106,7 +106,8 @@ class RemovableDriveManagerImpl @Override public boolean isWriterTaskNeeded(ContactId c) throws DbException { return db.transactionWithResult(true, txn -> - db.containsAnythingToSend(txn, c, MAX_LATENCY, true)); + db.containsAcksToSend(txn, c) || + db.containsMessagesToSend(txn, c, MAX_LATENCY, true)); } @Override diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java index 349b8d37b..4bfb17bb0 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java @@ -50,6 +50,7 @@ import static java.util.logging.Level.WARNING; import static java.util.logging.Logger.getLogger; import static org.briarproject.bramble.api.lifecycle.LifecycleManager.LifecycleState.STOPPING; import static org.briarproject.bramble.api.record.Record.RECORD_HEADER_BYTES; +import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED; import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS; import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_LENGTH; import static org.briarproject.bramble.api.sync.SyncConstants.SUPPORTED_VERSIONS; @@ -235,8 +236,10 @@ class DuplexOutgoingSession implements SyncSession, EventListener { generateOffer(); } else if (e instanceof GroupVisibilityUpdatedEvent) { GroupVisibilityUpdatedEvent g = (GroupVisibilityUpdatedEvent) e; - if (g.getAffectedContacts().contains(contactId)) + if (g.getVisibility() == SHARED && + g.getAffectedContacts().contains(contactId)) { generateOffer(); + } } else if (e instanceof MessageRequestedEvent) { if (((MessageRequestedEvent) e).getContactId().equals(contactId)) generateBatch(); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/MailboxOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/MailboxOutgoingSession.java index 5d4e06fb4..3dcf95a3a 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/MailboxOutgoingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/MailboxOutgoingSession.java @@ -7,9 +7,9 @@ import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.sync.Ack; -import org.briarproject.bramble.api.sync.DeferredSendHandler; import org.briarproject.bramble.api.sync.Message; import org.briarproject.bramble.api.sync.MessageId; +import org.briarproject.bramble.api.sync.OutgoingSessionRecord; import org.briarproject.bramble.api.sync.SyncRecordWriter; import org.briarproject.bramble.api.transport.StreamWriter; @@ -29,7 +29,7 @@ import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LEN /** * A {@link SimplexOutgoingSession} for sending and acking messages via a - * mailbox. The session uses a {@link DeferredSendHandler} to record the IDs + * mailbox. The session uses a {@link OutgoingSessionRecord} to record the IDs * of the messages sent and acked during the session so that they can be * recorded in the DB as sent or acked after the file has been successfully * uploaded to the mailbox. @@ -41,7 +41,7 @@ class MailboxOutgoingSession extends SimplexOutgoingSession { private static final Logger LOG = getLogger(MailboxOutgoingSession.class.getName()); - private final DeferredSendHandler deferredSendHandler; + private final OutgoingSessionRecord sessionRecord; private final long initialCapacity; MailboxOutgoingSession(DatabaseComponent db, @@ -51,11 +51,11 @@ class MailboxOutgoingSession extends SimplexOutgoingSession { long maxLatency, StreamWriter streamWriter, SyncRecordWriter recordWriter, - DeferredSendHandler deferredSendHandler, + OutgoingSessionRecord sessionRecord, long capacity) { super(db, eventBus, contactId, transportId, maxLatency, streamWriter, recordWriter); - this.deferredSendHandler = deferredSendHandler; + this.sessionRecord = sessionRecord; this.initialCapacity = capacity; } @@ -65,7 +65,7 @@ class MailboxOutgoingSession extends SimplexOutgoingSession { Collection idsToAck = loadMessageIdsToAck(); if (idsToAck.isEmpty()) break; recordWriter.writeAck(new Ack(idsToAck)); - deferredSendHandler.onAckSent(idsToAck); + sessionRecord.onAckSent(idsToAck); LOG.info("Sent ack"); } } @@ -96,7 +96,7 @@ class MailboxOutgoingSession extends SimplexOutgoingSession { db.getMessageToSend(txn, contactId, m, maxLatency, false)); if (message == null) continue; // No longer shared recordWriter.writeMessage(message); - deferredSendHandler.onMessageSent(m); + sessionRecord.onMessageSent(m); LOG.info("Sent message"); } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java index fef6d7e45..fcd599af5 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java @@ -6,6 +6,7 @@ import org.briarproject.bramble.api.db.DatabaseExecutor; import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.TransportId; +import org.briarproject.bramble.api.sync.OutgoingSessionRecord; import org.briarproject.bramble.api.sync.Priority; import org.briarproject.bramble.api.sync.PriorityHandler; import org.briarproject.bramble.api.sync.SyncRecordReader; @@ -25,6 +26,8 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.Immutable; import javax.inject.Inject; +import static org.briarproject.bramble.api.mailbox.MailboxConstants.MAX_FILE_PAYLOAD_BYTES; + @Immutable @NotNullByDefault class SyncSessionFactoryImpl implements SyncSessionFactory { @@ -73,6 +76,18 @@ class SyncSessionFactoryImpl implements SyncSessionFactory { } } + @Override + public SyncSession createSimplexOutgoingSession(ContactId c, TransportId t, + long maxLatency, StreamWriter streamWriter, + OutgoingSessionRecord sessionRecord) { + OutputStream out = streamWriter.getOutputStream(); + SyncRecordWriter recordWriter = + recordWriterFactory.createRecordWriter(out); + return new MailboxOutgoingSession(db, eventBus, c, t, maxLatency, + streamWriter, recordWriter, sessionRecord, + MAX_FILE_PAYLOAD_BYTES); + } + @Override public SyncSession createDuplexOutgoingSession(ContactId c, TransportId t, long maxLatency, int maxIdleTime, StreamWriter streamWriter, diff --git a/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java b/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java index e288e5507..2e342186a 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/db/DatabaseComponentImplTest.java @@ -303,11 +303,11 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { throws Exception { context.checking(new Expectations() {{ // Check whether the contact is in the DB (which it's not) - exactly(25).of(database).startTransaction(); + exactly(27).of(database).startTransaction(); will(returnValue(txn)); - exactly(25).of(database).containsContact(txn, contactId); + exactly(27).of(database).containsContact(txn, contactId); will(returnValue(false)); - exactly(25).of(database).abortTransaction(txn); + exactly(27).of(database).abortTransaction(txn); }}); DatabaseComponent db = createDatabaseComponent(database, eventBus, eventExecutor, shutdownManager); @@ -321,6 +321,23 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase { // Expected } + try { + db.transaction(true, transaction -> + db.containsAcksToSend(transaction, contactId)); + fail(); + } catch (NoSuchContactException expected) { + // Expected + } + + try { + db.transaction(true, transaction -> + db.containsMessagesToSend(transaction, contactId, + 123, true)); + fail(); + } catch (NoSuchContactException expected) { + // Expected + } + try { db.transaction(false, transaction -> db.generateAck(transaction, contactId, 123)); diff --git a/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java b/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java index 567a89c60..54d19da0d 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/db/JdbcDatabaseTest.java @@ -378,9 +378,9 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { // Initially there should be nothing to send assertFalse( - db.containsAnythingToSend(txn, contactId, MAX_LATENCY, false)); + db.containsMessagesToSend(txn, contactId, MAX_LATENCY, false)); assertFalse( - db.containsAnythingToSend(txn, contactId, MAX_LATENCY, true)); + db.containsMessagesToSend(txn, contactId, MAX_LATENCY, true)); // Add some messages to ack Message message1 = getMessage(groupId); @@ -389,10 +389,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { db.addMessage(txn, message1, DELIVERED, true, false, contactId); // Both message IDs should be returned - assertTrue( - db.containsAnythingToSend(txn, contactId, MAX_LATENCY, false)); - assertTrue( - db.containsAnythingToSend(txn, contactId, MAX_LATENCY, true)); + assertTrue(db.containsAcksToSend(txn, contactId)); Collection ids = db.getMessagesToAck(txn, contactId, 1234); assertEquals(asList(messageId, messageId1), ids); @@ -400,10 +397,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { db.lowerAckFlag(txn, contactId, asList(messageId, messageId1)); // No message IDs should be returned - assertFalse( - db.containsAnythingToSend(txn, contactId, MAX_LATENCY, false)); - assertFalse( - db.containsAnythingToSend(txn, contactId, MAX_LATENCY, true)); + assertFalse(db.containsAcksToSend(txn, contactId)); assertEquals(emptyList(), db.getMessagesToAck(txn, contactId, 1234)); // Raise the ack flag again @@ -411,10 +405,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { db.raiseAckFlag(txn, contactId, messageId1); // Both message IDs should be returned - assertTrue( - db.containsAnythingToSend(txn, contactId, MAX_LATENCY, false)); - assertTrue( - db.containsAnythingToSend(txn, contactId, MAX_LATENCY, true)); + assertTrue(db.containsAcksToSend(txn, contactId)); ids = db.getMessagesToAck(txn, contactId, 1234); assertEquals(asList(messageId, messageId1), ids); @@ -2579,7 +2570,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { private void assertNothingToSendLazily(Database db, Connection txn) throws Exception { assertFalse( - db.containsAnythingToSend(txn, contactId, MAX_LATENCY, false)); + db.containsMessagesToSend(txn, contactId, MAX_LATENCY, false)); Collection ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY); assertTrue(ids.isEmpty()); @@ -2590,7 +2581,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { private void assertOneMessageToSendLazily(Database db, Connection txn) throws Exception { assertTrue( - db.containsAnythingToSend(txn, contactId, MAX_LATENCY, false)); + db.containsMessagesToSend(txn, contactId, MAX_LATENCY, false)); Collection ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY); assertEquals(singletonList(messageId), ids); @@ -2601,7 +2592,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { private void assertNothingToSendEagerly(Database db, Connection txn) throws Exception { assertFalse( - db.containsAnythingToSend(txn, contactId, MAX_LATENCY, true)); + db.containsMessagesToSend(txn, contactId, MAX_LATENCY, true)); Collection unacked = db.getUnackedMessagesToSend(txn, contactId); assertTrue(unacked.isEmpty()); @@ -2611,7 +2602,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase { private void assertOneMessageToSendEagerly(Database db, Connection txn) throws Exception { assertTrue( - db.containsAnythingToSend(txn, contactId, MAX_LATENCY, true)); + db.containsMessagesToSend(txn, contactId, MAX_LATENCY, true)); Collection unacked = db.getUnackedMessagesToSend(txn, contactId); assertEquals(singletonList(messageId), unacked); diff --git a/bramble-core/src/test/java/org/briarproject/bramble/sync/MailboxOutgoingSessionTest.java b/bramble-core/src/test/java/org/briarproject/bramble/sync/MailboxOutgoingSessionTest.java index 904882847..814599358 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/sync/MailboxOutgoingSessionTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/sync/MailboxOutgoingSessionTest.java @@ -6,10 +6,10 @@ import org.briarproject.bramble.api.db.Transaction; import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.sync.Ack; -import org.briarproject.bramble.api.sync.DeferredSendHandler; import org.briarproject.bramble.api.sync.GroupId; import org.briarproject.bramble.api.sync.Message; import org.briarproject.bramble.api.sync.MessageId; +import org.briarproject.bramble.api.sync.OutgoingSessionRecord; import org.briarproject.bramble.api.sync.SyncRecordWriter; import org.briarproject.bramble.api.sync.Versions; import org.briarproject.bramble.api.transport.StreamWriter; @@ -30,6 +30,7 @@ import static org.briarproject.bramble.test.TestUtils.getContactId; import static org.briarproject.bramble.test.TestUtils.getMessage; import static org.briarproject.bramble.test.TestUtils.getRandomId; import static org.briarproject.bramble.test.TestUtils.getTransportId; +import static org.junit.Assert.assertEquals; public class MailboxOutgoingSessionTest extends BrambleMockTestCase { @@ -40,8 +41,6 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { private final StreamWriter streamWriter = context.mock(StreamWriter.class); private final SyncRecordWriter recordWriter = context.mock(SyncRecordWriter.class); - private final DeferredSendHandler deferredSendHandler = - context.mock(DeferredSendHandler.class); private final ContactId contactId = getContactId(); private final TransportId transportId = getTransportId(); @@ -53,9 +52,10 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { @Test public void testNothingToSend() throws Exception { + OutgoingSessionRecord sessionRecord = new OutgoingSessionRecord(); MailboxOutgoingSession session = new MailboxOutgoingSession(db, eventBus, contactId, transportId, MAX_LATENCY, - streamWriter, recordWriter, deferredSendHandler, + streamWriter, recordWriter, sessionRecord, MAX_FILE_PAYLOAD_BYTES); Transaction noAckIdTxn = new Transaction(null, true); @@ -92,13 +92,17 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { }}); session.run(); + + assertEquals(emptyList(), sessionRecord.getAckedIds()); + assertEquals(emptyList(), sessionRecord.getSentIds()); } @Test public void testSomethingToSend() throws Exception { + OutgoingSessionRecord sessionRecord = new OutgoingSessionRecord(); MailboxOutgoingSession session = new MailboxOutgoingSession(db, eventBus, contactId, transportId, MAX_LATENCY, - streamWriter, recordWriter, deferredSendHandler, + streamWriter, recordWriter, sessionRecord, MAX_FILE_PAYLOAD_BYTES); Transaction ackIdTxn = new Transaction(null, true); @@ -127,8 +131,6 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { oneOf(recordWriter).getBytesWritten(); will(returnValue((long) versionRecordBytes)); oneOf(recordWriter).writeAck(with(any(Ack.class))); - oneOf(deferredSendHandler) - .onAckSent(singletonList(message.getId())); // No more messages to ack oneOf(db).transactionWithResult(with(true), withDbCallable(noAckIdTxn)); @@ -150,7 +152,6 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { MAX_LATENCY, false); will(returnValue(message1)); oneOf(recordWriter).writeMessage(message1); - oneOf(deferredSendHandler).onMessageSent(message1.getId()); // Send the end of stream marker oneOf(streamWriter).sendEndOfStream(); // Remove listener @@ -158,6 +159,11 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { }}); session.run(); + + assertEquals(singletonList(message.getId()), + sessionRecord.getAckedIds()); + assertEquals(singletonList(message1.getId()), + sessionRecord.getSentIds()); } @Test @@ -167,9 +173,10 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { long capacity = RECORD_HEADER_BYTES + MessageId.LENGTH * MAX_MESSAGE_IDS + RECORD_HEADER_BYTES + MessageId.LENGTH + MessageId.LENGTH - 1; + OutgoingSessionRecord sessionRecord = new OutgoingSessionRecord(); MailboxOutgoingSession session = new MailboxOutgoingSession(db, eventBus, contactId, transportId, MAX_LATENCY, - streamWriter, recordWriter, deferredSendHandler, capacity); + streamWriter, recordWriter, sessionRecord, capacity); Transaction ackIdTxn1 = new Transaction(null, true); Transaction ackIdTxn2 = new Transaction(null, true); @@ -184,6 +191,9 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { } List idsInSecondAck = singletonList(new MessageId(getRandomId())); + List allIds = new ArrayList<>(MAX_MESSAGE_IDS + 1); + allIds.addAll(idsInFirstAck); + allIds.addAll(idsInSecondAck); context.checking(new DbExpectations() {{ // Add listener @@ -200,7 +210,6 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { will(returnValue(idsInFirstAck)); // Send the first ack record oneOf(recordWriter).writeAck(with(any(Ack.class))); - oneOf(deferredSendHandler).onAckSent(idsInFirstAck); // Calculate remaining capacity for acks oneOf(recordWriter).getBytesWritten(); will(returnValue((long) versionRecordBytes + firstAckRecordBytes)); @@ -211,7 +220,6 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { will(returnValue(idsInSecondAck)); // Send the second ack record oneOf(recordWriter).writeAck(with(any(Ack.class))); - oneOf(deferredSendHandler).onAckSent(idsInSecondAck); // Not enough capacity left for another ack oneOf(recordWriter).getBytesWritten(); will(returnValue((long) versionRecordBytes + firstAckRecordBytes @@ -227,5 +235,8 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase { }}); session.run(); + + assertEquals(allIds, sessionRecord.getAckedIds()); + assertEquals(emptyList(), sessionRecord.getSentIds()); } }