From ff9f70667055d2d588adbc0ed5a2479330641900 Mon Sep 17 00:00:00 2001 From: akwizgran Date: Thu, 16 Jun 2022 13:27:11 +0100 Subject: [PATCH] Add plumbing for creating outgoing sync sessions. --- .../api/connection/ConnectionManager.java | 9 ++++++++ .../bramble/api/sync/SyncSessionFactory.java | 18 +++++++++++++++ .../connection/ConnectionManagerImpl.java | 12 +++++++++- .../OutgoingSimplexSyncConnection.java | 23 +++++++++++++++---- .../bramble/sync/SyncSessionFactoryImpl.java | 15 ++++++++++++ 5 files changed, 71 insertions(+), 6 deletions(-) diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/connection/ConnectionManager.java b/bramble-api/src/main/java/org/briarproject/bramble/api/connection/ConnectionManager.java index 3d7df9bea..63a9b36ae 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/connection/ConnectionManager.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/connection/ConnectionManager.java @@ -7,6 +7,7 @@ import org.briarproject.bramble.api.plugin.TransportConnectionReader; import org.briarproject.bramble.api.plugin.TransportConnectionWriter; import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; +import org.briarproject.bramble.api.sync.OutgoingSessionRecord; @NotNullByDefault public interface ConnectionManager { @@ -45,6 +46,14 @@ public interface ConnectionManager { void manageOutgoingConnection(ContactId c, TransportId t, TransportConnectionWriter w); + /** + * Manages an outgoing connection to a contact via a mailbox. The IDs of + * any messages sent or acked are added to the given + * {@link OutgoingSessionRecord}. + */ + void manageOutgoingConnection(ContactId c, TransportId t, + TransportConnectionWriter w, OutgoingSessionRecord sessionRecord); + /** * Manages an outgoing connection to a contact over a duplex transport. */ diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java index bb54f58e4..65250938f 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java @@ -12,12 +12,30 @@ import javax.annotation.Nullable; @NotNullByDefault public interface SyncSessionFactory { + /** + * Creates a session for receiving data from a contact. + */ SyncSession createIncomingSession(ContactId c, InputStream in, PriorityHandler handler); + /** + * Creates a session for sending data to a contact over a simplex transport. + * + * @param eager True if messages should be sent eagerly, ie regardless of + * whether they're due for retransmission. + */ SyncSession createSimplexOutgoingSession(ContactId c, TransportId t, long maxLatency, boolean eager, StreamWriter streamWriter); + /** + * Creates a session for sending data to a contact via a mailbox. The IDs + * of any messages sent or acked will be added to the given + * {@link OutgoingSessionRecord}. + */ + SyncSession createSimplexOutgoingSession(ContactId c, TransportId t, + long maxLatency, StreamWriter streamWriter, + OutgoingSessionRecord sessionRecord); + SyncSession createDuplexOutgoingSession(ContactId c, TransportId t, long maxLatency, int maxIdleTime, StreamWriter streamWriter, @Nullable Priority priority); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionManagerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionManagerImpl.java index 2e37ce7cd..f3224e5db 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionManagerImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/ConnectionManagerImpl.java @@ -13,6 +13,7 @@ import org.briarproject.bramble.api.plugin.TransportConnectionWriter; import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; import org.briarproject.bramble.api.properties.TransportPropertyManager; +import org.briarproject.bramble.api.sync.OutgoingSessionRecord; import org.briarproject.bramble.api.sync.SyncSessionFactory; import org.briarproject.bramble.api.transport.KeyManager; import org.briarproject.bramble.api.transport.StreamReaderFactory; @@ -100,7 +101,16 @@ class ConnectionManagerImpl implements ConnectionManager { TransportConnectionWriter w) { ioExecutor.execute(new OutgoingSimplexSyncConnection(keyManager, connectionRegistry, streamReaderFactory, streamWriterFactory, - syncSessionFactory, transportPropertyManager, c, t, w)); + syncSessionFactory, transportPropertyManager, c, t, w, null)); + } + + @Override + public void manageOutgoingConnection(ContactId c, TransportId t, + TransportConnectionWriter w, OutgoingSessionRecord sessionRecord) { + ioExecutor.execute(new OutgoingSimplexSyncConnection(keyManager, + connectionRegistry, streamReaderFactory, streamWriterFactory, + syncSessionFactory, transportPropertyManager, c, t, w, + sessionRecord)); } @Override diff --git a/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingSimplexSyncConnection.java b/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingSimplexSyncConnection.java index 9bec08193..d128d9d71 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingSimplexSyncConnection.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/connection/OutgoingSimplexSyncConnection.java @@ -6,6 +6,7 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.TransportConnectionWriter; import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.properties.TransportPropertyManager; +import org.briarproject.bramble.api.sync.OutgoingSessionRecord; import org.briarproject.bramble.api.sync.SyncSession; import org.briarproject.bramble.api.sync.SyncSessionFactory; import org.briarproject.bramble.api.transport.KeyManager; @@ -16,6 +17,8 @@ import org.briarproject.bramble.api.transport.StreamWriterFactory; import java.io.IOException; +import javax.annotation.Nullable; + import static java.util.logging.Level.WARNING; import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull; import static org.briarproject.bramble.util.LogUtils.logException; @@ -26,6 +29,8 @@ class OutgoingSimplexSyncConnection extends SyncConnection implements Runnable { private final ContactId contactId; private final TransportId transportId; private final TransportConnectionWriter writer; + @Nullable + private final OutgoingSessionRecord sessionRecord; OutgoingSimplexSyncConnection(KeyManager keyManager, ConnectionRegistry connectionRegistry, @@ -34,13 +39,15 @@ class OutgoingSimplexSyncConnection extends SyncConnection implements Runnable { SyncSessionFactory syncSessionFactory, TransportPropertyManager transportPropertyManager, ContactId contactId, TransportId transportId, - TransportConnectionWriter writer) { + TransportConnectionWriter writer, + @Nullable OutgoingSessionRecord sessionRecord) { super(keyManager, connectionRegistry, streamReaderFactory, streamWriterFactory, syncSessionFactory, transportPropertyManager); this.contactId = contactId; this.transportId = transportId; this.writer = writer; + this.sessionRecord = sessionRecord; } @Override @@ -71,10 +78,16 @@ class OutgoingSimplexSyncConnection extends SyncConnection implements Runnable { StreamWriter streamWriter = streamWriterFactory.createStreamWriter( w.getOutputStream(), ctx); ContactId c = requireNonNull(ctx.getContactId()); - // Use eager retransmission if the transport is lossy and cheap - return syncSessionFactory.createSimplexOutgoingSession(c, - ctx.getTransportId(), w.getMaxLatency(), w.isLossyAndCheap(), - streamWriter); + if (sessionRecord == null) { + // Use eager retransmission if the transport is lossy and cheap + return syncSessionFactory.createSimplexOutgoingSession(c, + ctx.getTransportId(), w.getMaxLatency(), + w.isLossyAndCheap(), streamWriter); + } else { + return syncSessionFactory.createSimplexOutgoingSession(c, + ctx.getTransportId(), w.getMaxLatency(), streamWriter, + sessionRecord); + } } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java index fef6d7e45..fcd599af5 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java @@ -6,6 +6,7 @@ import org.briarproject.bramble.api.db.DatabaseExecutor; import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.plugin.TransportId; +import org.briarproject.bramble.api.sync.OutgoingSessionRecord; import org.briarproject.bramble.api.sync.Priority; import org.briarproject.bramble.api.sync.PriorityHandler; import org.briarproject.bramble.api.sync.SyncRecordReader; @@ -25,6 +26,8 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.Immutable; import javax.inject.Inject; +import static org.briarproject.bramble.api.mailbox.MailboxConstants.MAX_FILE_PAYLOAD_BYTES; + @Immutable @NotNullByDefault class SyncSessionFactoryImpl implements SyncSessionFactory { @@ -73,6 +76,18 @@ class SyncSessionFactoryImpl implements SyncSessionFactory { } } + @Override + public SyncSession createSimplexOutgoingSession(ContactId c, TransportId t, + long maxLatency, StreamWriter streamWriter, + OutgoingSessionRecord sessionRecord) { + OutputStream out = streamWriter.getOutputStream(); + SyncRecordWriter recordWriter = + recordWriterFactory.createRecordWriter(out); + return new MailboxOutgoingSession(db, eventBus, c, t, maxLatency, + streamWriter, recordWriter, sessionRecord, + MAX_FILE_PAYLOAD_BYTES); + } + @Override public SyncSession createDuplexOutgoingSession(ContactId c, TransportId t, long maxLatency, int maxIdleTime, StreamWriter streamWriter,