mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-19 06:09:55 +01:00
Add plumbing for creating outgoing sync sessions.
This commit is contained in:
@@ -7,6 +7,7 @@ import org.briarproject.bramble.api.plugin.TransportConnectionReader;
|
|||||||
import org.briarproject.bramble.api.plugin.TransportConnectionWriter;
|
import org.briarproject.bramble.api.plugin.TransportConnectionWriter;
|
||||||
import org.briarproject.bramble.api.plugin.TransportId;
|
import org.briarproject.bramble.api.plugin.TransportId;
|
||||||
import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
|
import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
|
||||||
|
import org.briarproject.bramble.api.sync.OutgoingSessionRecord;
|
||||||
|
|
||||||
@NotNullByDefault
|
@NotNullByDefault
|
||||||
public interface ConnectionManager {
|
public interface ConnectionManager {
|
||||||
@@ -45,6 +46,14 @@ public interface ConnectionManager {
|
|||||||
void manageOutgoingConnection(ContactId c, TransportId t,
|
void manageOutgoingConnection(ContactId c, TransportId t,
|
||||||
TransportConnectionWriter w);
|
TransportConnectionWriter w);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Manages an outgoing connection to a contact via a mailbox. The IDs of
|
||||||
|
* any messages sent or acked are added to the given
|
||||||
|
* {@link OutgoingSessionRecord}.
|
||||||
|
*/
|
||||||
|
void manageOutgoingConnection(ContactId c, TransportId t,
|
||||||
|
TransportConnectionWriter w, OutgoingSessionRecord sessionRecord);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Manages an outgoing connection to a contact over a duplex transport.
|
* Manages an outgoing connection to a contact over a duplex transport.
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -12,12 +12,30 @@ import javax.annotation.Nullable;
|
|||||||
@NotNullByDefault
|
@NotNullByDefault
|
||||||
public interface SyncSessionFactory {
|
public interface SyncSessionFactory {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a session for receiving data from a contact.
|
||||||
|
*/
|
||||||
SyncSession createIncomingSession(ContactId c, InputStream in,
|
SyncSession createIncomingSession(ContactId c, InputStream in,
|
||||||
PriorityHandler handler);
|
PriorityHandler handler);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a session for sending data to a contact over a simplex transport.
|
||||||
|
*
|
||||||
|
* @param eager True if messages should be sent eagerly, ie regardless of
|
||||||
|
* whether they're due for retransmission.
|
||||||
|
*/
|
||||||
SyncSession createSimplexOutgoingSession(ContactId c, TransportId t,
|
SyncSession createSimplexOutgoingSession(ContactId c, TransportId t,
|
||||||
long maxLatency, boolean eager, StreamWriter streamWriter);
|
long maxLatency, boolean eager, StreamWriter streamWriter);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a session for sending data to a contact via a mailbox. The IDs
|
||||||
|
* of any messages sent or acked will be added to the given
|
||||||
|
* {@link OutgoingSessionRecord}.
|
||||||
|
*/
|
||||||
|
SyncSession createSimplexOutgoingSession(ContactId c, TransportId t,
|
||||||
|
long maxLatency, StreamWriter streamWriter,
|
||||||
|
OutgoingSessionRecord sessionRecord);
|
||||||
|
|
||||||
SyncSession createDuplexOutgoingSession(ContactId c, TransportId t,
|
SyncSession createDuplexOutgoingSession(ContactId c, TransportId t,
|
||||||
long maxLatency, int maxIdleTime, StreamWriter streamWriter,
|
long maxLatency, int maxIdleTime, StreamWriter streamWriter,
|
||||||
@Nullable Priority priority);
|
@Nullable Priority priority);
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ import org.briarproject.bramble.api.plugin.TransportConnectionWriter;
|
|||||||
import org.briarproject.bramble.api.plugin.TransportId;
|
import org.briarproject.bramble.api.plugin.TransportId;
|
||||||
import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
|
import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
|
||||||
import org.briarproject.bramble.api.properties.TransportPropertyManager;
|
import org.briarproject.bramble.api.properties.TransportPropertyManager;
|
||||||
|
import org.briarproject.bramble.api.sync.OutgoingSessionRecord;
|
||||||
import org.briarproject.bramble.api.sync.SyncSessionFactory;
|
import org.briarproject.bramble.api.sync.SyncSessionFactory;
|
||||||
import org.briarproject.bramble.api.transport.KeyManager;
|
import org.briarproject.bramble.api.transport.KeyManager;
|
||||||
import org.briarproject.bramble.api.transport.StreamReaderFactory;
|
import org.briarproject.bramble.api.transport.StreamReaderFactory;
|
||||||
@@ -100,7 +101,16 @@ class ConnectionManagerImpl implements ConnectionManager {
|
|||||||
TransportConnectionWriter w) {
|
TransportConnectionWriter w) {
|
||||||
ioExecutor.execute(new OutgoingSimplexSyncConnection(keyManager,
|
ioExecutor.execute(new OutgoingSimplexSyncConnection(keyManager,
|
||||||
connectionRegistry, streamReaderFactory, streamWriterFactory,
|
connectionRegistry, streamReaderFactory, streamWriterFactory,
|
||||||
syncSessionFactory, transportPropertyManager, c, t, w));
|
syncSessionFactory, transportPropertyManager, c, t, w, null));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void manageOutgoingConnection(ContactId c, TransportId t,
|
||||||
|
TransportConnectionWriter w, OutgoingSessionRecord sessionRecord) {
|
||||||
|
ioExecutor.execute(new OutgoingSimplexSyncConnection(keyManager,
|
||||||
|
connectionRegistry, streamReaderFactory, streamWriterFactory,
|
||||||
|
syncSessionFactory, transportPropertyManager, c, t, w,
|
||||||
|
sessionRecord));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
|||||||
import org.briarproject.bramble.api.plugin.TransportConnectionWriter;
|
import org.briarproject.bramble.api.plugin.TransportConnectionWriter;
|
||||||
import org.briarproject.bramble.api.plugin.TransportId;
|
import org.briarproject.bramble.api.plugin.TransportId;
|
||||||
import org.briarproject.bramble.api.properties.TransportPropertyManager;
|
import org.briarproject.bramble.api.properties.TransportPropertyManager;
|
||||||
|
import org.briarproject.bramble.api.sync.OutgoingSessionRecord;
|
||||||
import org.briarproject.bramble.api.sync.SyncSession;
|
import org.briarproject.bramble.api.sync.SyncSession;
|
||||||
import org.briarproject.bramble.api.sync.SyncSessionFactory;
|
import org.briarproject.bramble.api.sync.SyncSessionFactory;
|
||||||
import org.briarproject.bramble.api.transport.KeyManager;
|
import org.briarproject.bramble.api.transport.KeyManager;
|
||||||
@@ -16,6 +17,8 @@ import org.briarproject.bramble.api.transport.StreamWriterFactory;
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
import static java.util.logging.Level.WARNING;
|
import static java.util.logging.Level.WARNING;
|
||||||
import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull;
|
import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull;
|
||||||
import static org.briarproject.bramble.util.LogUtils.logException;
|
import static org.briarproject.bramble.util.LogUtils.logException;
|
||||||
@@ -26,6 +29,8 @@ class OutgoingSimplexSyncConnection extends SyncConnection implements Runnable {
|
|||||||
private final ContactId contactId;
|
private final ContactId contactId;
|
||||||
private final TransportId transportId;
|
private final TransportId transportId;
|
||||||
private final TransportConnectionWriter writer;
|
private final TransportConnectionWriter writer;
|
||||||
|
@Nullable
|
||||||
|
private final OutgoingSessionRecord sessionRecord;
|
||||||
|
|
||||||
OutgoingSimplexSyncConnection(KeyManager keyManager,
|
OutgoingSimplexSyncConnection(KeyManager keyManager,
|
||||||
ConnectionRegistry connectionRegistry,
|
ConnectionRegistry connectionRegistry,
|
||||||
@@ -34,13 +39,15 @@ class OutgoingSimplexSyncConnection extends SyncConnection implements Runnable {
|
|||||||
SyncSessionFactory syncSessionFactory,
|
SyncSessionFactory syncSessionFactory,
|
||||||
TransportPropertyManager transportPropertyManager,
|
TransportPropertyManager transportPropertyManager,
|
||||||
ContactId contactId, TransportId transportId,
|
ContactId contactId, TransportId transportId,
|
||||||
TransportConnectionWriter writer) {
|
TransportConnectionWriter writer,
|
||||||
|
@Nullable OutgoingSessionRecord sessionRecord) {
|
||||||
super(keyManager, connectionRegistry, streamReaderFactory,
|
super(keyManager, connectionRegistry, streamReaderFactory,
|
||||||
streamWriterFactory, syncSessionFactory,
|
streamWriterFactory, syncSessionFactory,
|
||||||
transportPropertyManager);
|
transportPropertyManager);
|
||||||
this.contactId = contactId;
|
this.contactId = contactId;
|
||||||
this.transportId = transportId;
|
this.transportId = transportId;
|
||||||
this.writer = writer;
|
this.writer = writer;
|
||||||
|
this.sessionRecord = sessionRecord;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -71,10 +78,16 @@ class OutgoingSimplexSyncConnection extends SyncConnection implements Runnable {
|
|||||||
StreamWriter streamWriter = streamWriterFactory.createStreamWriter(
|
StreamWriter streamWriter = streamWriterFactory.createStreamWriter(
|
||||||
w.getOutputStream(), ctx);
|
w.getOutputStream(), ctx);
|
||||||
ContactId c = requireNonNull(ctx.getContactId());
|
ContactId c = requireNonNull(ctx.getContactId());
|
||||||
// Use eager retransmission if the transport is lossy and cheap
|
if (sessionRecord == null) {
|
||||||
return syncSessionFactory.createSimplexOutgoingSession(c,
|
// Use eager retransmission if the transport is lossy and cheap
|
||||||
ctx.getTransportId(), w.getMaxLatency(), w.isLossyAndCheap(),
|
return syncSessionFactory.createSimplexOutgoingSession(c,
|
||||||
streamWriter);
|
ctx.getTransportId(), w.getMaxLatency(),
|
||||||
|
w.isLossyAndCheap(), streamWriter);
|
||||||
|
} else {
|
||||||
|
return syncSessionFactory.createSimplexOutgoingSession(c,
|
||||||
|
ctx.getTransportId(), w.getMaxLatency(), streamWriter,
|
||||||
|
sessionRecord);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import org.briarproject.bramble.api.db.DatabaseExecutor;
|
|||||||
import org.briarproject.bramble.api.event.EventBus;
|
import org.briarproject.bramble.api.event.EventBus;
|
||||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||||
import org.briarproject.bramble.api.plugin.TransportId;
|
import org.briarproject.bramble.api.plugin.TransportId;
|
||||||
|
import org.briarproject.bramble.api.sync.OutgoingSessionRecord;
|
||||||
import org.briarproject.bramble.api.sync.Priority;
|
import org.briarproject.bramble.api.sync.Priority;
|
||||||
import org.briarproject.bramble.api.sync.PriorityHandler;
|
import org.briarproject.bramble.api.sync.PriorityHandler;
|
||||||
import org.briarproject.bramble.api.sync.SyncRecordReader;
|
import org.briarproject.bramble.api.sync.SyncRecordReader;
|
||||||
@@ -25,6 +26,8 @@ import javax.annotation.Nullable;
|
|||||||
import javax.annotation.concurrent.Immutable;
|
import javax.annotation.concurrent.Immutable;
|
||||||
import javax.inject.Inject;
|
import javax.inject.Inject;
|
||||||
|
|
||||||
|
import static org.briarproject.bramble.api.mailbox.MailboxConstants.MAX_FILE_PAYLOAD_BYTES;
|
||||||
|
|
||||||
@Immutable
|
@Immutable
|
||||||
@NotNullByDefault
|
@NotNullByDefault
|
||||||
class SyncSessionFactoryImpl implements SyncSessionFactory {
|
class SyncSessionFactoryImpl implements SyncSessionFactory {
|
||||||
@@ -73,6 +76,18 @@ class SyncSessionFactoryImpl implements SyncSessionFactory {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SyncSession createSimplexOutgoingSession(ContactId c, TransportId t,
|
||||||
|
long maxLatency, StreamWriter streamWriter,
|
||||||
|
OutgoingSessionRecord sessionRecord) {
|
||||||
|
OutputStream out = streamWriter.getOutputStream();
|
||||||
|
SyncRecordWriter recordWriter =
|
||||||
|
recordWriterFactory.createRecordWriter(out);
|
||||||
|
return new MailboxOutgoingSession(db, eventBus, c, t, maxLatency,
|
||||||
|
streamWriter, recordWriter, sessionRecord,
|
||||||
|
MAX_FILE_PAYLOAD_BYTES);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SyncSession createDuplexOutgoingSession(ContactId c, TransportId t,
|
public SyncSession createDuplexOutgoingSession(ContactId c, TransportId t,
|
||||||
long maxLatency, int maxIdleTime, StreamWriter streamWriter,
|
long maxLatency, int maxIdleTime, StreamWriter streamWriter,
|
||||||
|
|||||||
Reference in New Issue
Block a user