mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-15 12:19:54 +01:00
Merge branch 'stream-writer-interface' into 'master'
Send end of stream marker when sync session finishes See merge request akwizgran/briar!790
This commit is contained in:
@@ -30,6 +30,7 @@ import org.briarproject.bramble.api.record.RecordWriter;
|
||||
import org.briarproject.bramble.api.record.RecordWriterFactory;
|
||||
import org.briarproject.bramble.api.system.Clock;
|
||||
import org.briarproject.bramble.api.transport.StreamReaderFactory;
|
||||
import org.briarproject.bramble.api.transport.StreamWriter;
|
||||
import org.briarproject.bramble.api.transport.StreamWriterFactory;
|
||||
|
||||
import java.io.EOFException;
|
||||
@@ -152,11 +153,11 @@ class ContactExchangeTaskImpl extends Thread implements ContactExchangeTask {
|
||||
recordReaderFactory.createRecordReader(streamReader);
|
||||
|
||||
// Create the writers
|
||||
OutputStream streamWriter =
|
||||
StreamWriter streamWriter =
|
||||
streamWriterFactory.createContactExchangeStreamWriter(out,
|
||||
alice ? aliceHeaderKey : bobHeaderKey);
|
||||
RecordWriter recordWriter =
|
||||
recordWriterFactory.createRecordWriter(streamWriter);
|
||||
recordWriterFactory.createRecordWriter(streamWriter.getOutputStream());
|
||||
|
||||
// Derive the nonces to be signed
|
||||
byte[] aliceNonce = crypto.mac(ALICE_NONCE_LABEL, masterSecret,
|
||||
@@ -184,8 +185,8 @@ class ContactExchangeTaskImpl extends Thread implements ContactExchangeTask {
|
||||
localSignature, localTimestamp);
|
||||
recordWriter.flush();
|
||||
}
|
||||
// Close the outgoing stream
|
||||
recordWriter.close();
|
||||
// Send EOF on the outgoing stream
|
||||
streamWriter.sendEndOfStream();
|
||||
// Skip any remaining records from the incoming stream
|
||||
try {
|
||||
while (true) recordReader.readRecord();
|
||||
|
||||
@@ -14,12 +14,12 @@ import org.briarproject.bramble.api.sync.SyncSessionFactory;
|
||||
import org.briarproject.bramble.api.transport.KeyManager;
|
||||
import org.briarproject.bramble.api.transport.StreamContext;
|
||||
import org.briarproject.bramble.api.transport.StreamReaderFactory;
|
||||
import org.briarproject.bramble.api.transport.StreamWriter;
|
||||
import org.briarproject.bramble.api.transport.StreamWriterFactory;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
@@ -101,7 +101,7 @@ class ConnectionManagerImpl implements ConnectionManager {
|
||||
|
||||
private SyncSession createSimplexOutgoingSession(StreamContext ctx,
|
||||
TransportConnectionWriter w) throws IOException {
|
||||
OutputStream streamWriter = streamWriterFactory.createStreamWriter(
|
||||
StreamWriter streamWriter = streamWriterFactory.createStreamWriter(
|
||||
w.getOutputStream(), ctx);
|
||||
return syncSessionFactory.createSimplexOutgoingSession(
|
||||
ctx.getContactId(), w.getMaxLatency(), streamWriter);
|
||||
@@ -109,7 +109,7 @@ class ConnectionManagerImpl implements ConnectionManager {
|
||||
|
||||
private SyncSession createDuplexOutgoingSession(StreamContext ctx,
|
||||
TransportConnectionWriter w) throws IOException {
|
||||
OutputStream streamWriter = streamWriterFactory.createStreamWriter(
|
||||
StreamWriter streamWriter = streamWriterFactory.createStreamWriter(
|
||||
w.getOutputStream(), ctx);
|
||||
return syncSessionFactory.createDuplexOutgoingSession(
|
||||
ctx.getContactId(), w.getMaxLatency(), w.getMaxIdleTime(),
|
||||
@@ -300,8 +300,8 @@ class ConnectionManagerImpl implements ConnectionManager {
|
||||
}
|
||||
|
||||
private void disposeReader(boolean exception, boolean recognised) {
|
||||
if (exception && outgoingSession != null)
|
||||
outgoingSession.interrupt();
|
||||
// Interrupt the outgoing session so it finishes cleanly
|
||||
if (outgoingSession != null) outgoingSession.interrupt();
|
||||
try {
|
||||
reader.dispose(exception, recognised);
|
||||
} catch (IOException e) {
|
||||
@@ -310,6 +310,8 @@ class ConnectionManagerImpl implements ConnectionManager {
|
||||
}
|
||||
|
||||
private void disposeWriter(boolean exception) {
|
||||
// Interrupt the incoming session if an exception occurred,
|
||||
// otherwise wait for the end of stream marker
|
||||
if (exception && incomingSession != null)
|
||||
incomingSession.interrupt();
|
||||
try {
|
||||
@@ -407,8 +409,8 @@ class ConnectionManagerImpl implements ConnectionManager {
|
||||
}
|
||||
|
||||
private void disposeReader(boolean exception, boolean recognised) {
|
||||
if (exception && outgoingSession != null)
|
||||
outgoingSession.interrupt();
|
||||
// Interrupt the outgoing session so it finishes cleanly
|
||||
if (outgoingSession != null) outgoingSession.interrupt();
|
||||
try {
|
||||
reader.dispose(exception, recognised);
|
||||
} catch (IOException e) {
|
||||
@@ -417,6 +419,8 @@ class ConnectionManagerImpl implements ConnectionManager {
|
||||
}
|
||||
|
||||
private void disposeWriter(boolean exception) {
|
||||
// Interrupt the incoming session if an exception occurred,
|
||||
// otherwise wait for the end of stream marker
|
||||
if (exception && incomingSession != null)
|
||||
incomingSession.interrupt();
|
||||
try {
|
||||
|
||||
@@ -106,7 +106,7 @@ class ConnectionRegistryImpl implements ConnectionRegistry {
|
||||
if (m == null) return Collections.emptyList();
|
||||
List<ContactId> ids = new ArrayList<>(m.keySet());
|
||||
if (LOG.isLoggable(INFO))
|
||||
LOG.info(ids.size() + " contacts connected");
|
||||
LOG.info(ids.size() + " contacts connected: " + t);
|
||||
return ids;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
|
||||
@@ -23,6 +23,7 @@ import org.briarproject.bramble.api.sync.event.MessageSharedEvent;
|
||||
import org.briarproject.bramble.api.sync.event.MessageToAckEvent;
|
||||
import org.briarproject.bramble.api.sync.event.MessageToRequestEvent;
|
||||
import org.briarproject.bramble.api.system.Clock;
|
||||
import org.briarproject.bramble.api.transport.StreamWriter;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
@@ -67,6 +68,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
|
||||
private final Clock clock;
|
||||
private final ContactId contactId;
|
||||
private final int maxLatency, maxIdleTime;
|
||||
private final StreamWriter streamWriter;
|
||||
private final SyncRecordWriter recordWriter;
|
||||
private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks;
|
||||
|
||||
@@ -81,7 +83,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
|
||||
|
||||
DuplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
|
||||
EventBus eventBus, Clock clock, ContactId contactId, int maxLatency,
|
||||
int maxIdleTime, SyncRecordWriter recordWriter) {
|
||||
int maxIdleTime, StreamWriter streamWriter,
|
||||
SyncRecordWriter recordWriter) {
|
||||
this.db = db;
|
||||
this.dbExecutor = dbExecutor;
|
||||
this.eventBus = eventBus;
|
||||
@@ -89,6 +92,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
|
||||
this.contactId = contactId;
|
||||
this.maxLatency = maxLatency;
|
||||
this.maxIdleTime = maxIdleTime;
|
||||
this.streamWriter = streamWriter;
|
||||
this.recordWriter = recordWriter;
|
||||
writerTasks = new LinkedBlockingQueue<>();
|
||||
}
|
||||
@@ -149,7 +153,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
|
||||
dataToFlush = true;
|
||||
}
|
||||
}
|
||||
if (dataToFlush) recordWriter.flush();
|
||||
streamWriter.sendEndOfStream();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("Interrupted while waiting for a record to write");
|
||||
Thread.currentThread().interrupt();
|
||||
|
||||
@@ -63,7 +63,11 @@ class IncomingSession implements SyncSession, EventListener {
|
||||
eventBus.addListener(this);
|
||||
try {
|
||||
// Read records until interrupted or EOF
|
||||
while (!interrupted && !recordReader.eof()) {
|
||||
while (!interrupted) {
|
||||
if (recordReader.eof()) {
|
||||
LOG.info("End of stream");
|
||||
return;
|
||||
}
|
||||
if (recordReader.hasAck()) {
|
||||
Ack a = recordReader.readAck();
|
||||
dbExecutor.execute(new ReceiveAck(a));
|
||||
|
||||
@@ -15,6 +15,7 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.sync.Ack;
|
||||
import org.briarproject.bramble.api.sync.SyncRecordWriter;
|
||||
import org.briarproject.bramble.api.sync.SyncSession;
|
||||
import org.briarproject.bramble.api.transport.StreamWriter;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
@@ -51,6 +52,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
|
||||
private final EventBus eventBus;
|
||||
private final ContactId contactId;
|
||||
private final int maxLatency;
|
||||
private final StreamWriter streamWriter;
|
||||
private final SyncRecordWriter recordWriter;
|
||||
private final AtomicInteger outstandingQueries;
|
||||
private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks;
|
||||
@@ -58,13 +60,14 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
|
||||
private volatile boolean interrupted = false;
|
||||
|
||||
SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
|
||||
EventBus eventBus, ContactId contactId,
|
||||
int maxLatency, SyncRecordWriter recordWriter) {
|
||||
EventBus eventBus, ContactId contactId, int maxLatency,
|
||||
StreamWriter streamWriter, SyncRecordWriter recordWriter) {
|
||||
this.db = db;
|
||||
this.dbExecutor = dbExecutor;
|
||||
this.eventBus = eventBus;
|
||||
this.contactId = contactId;
|
||||
this.maxLatency = maxLatency;
|
||||
this.streamWriter = streamWriter;
|
||||
this.recordWriter = recordWriter;
|
||||
outstandingQueries = new AtomicInteger(2); // One per type of record
|
||||
writerTasks = new LinkedBlockingQueue<>();
|
||||
@@ -85,7 +88,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
|
||||
if (task == CLOSE) break;
|
||||
task.run();
|
||||
}
|
||||
recordWriter.flush();
|
||||
streamWriter.sendEndOfStream();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("Interrupted while waiting for a record to write");
|
||||
Thread.currentThread().interrupt();
|
||||
|
||||
@@ -12,6 +12,7 @@ import org.briarproject.bramble.api.sync.SyncRecordWriterFactory;
|
||||
import org.briarproject.bramble.api.sync.SyncSession;
|
||||
import org.briarproject.bramble.api.sync.SyncSessionFactory;
|
||||
import org.briarproject.bramble.api.system.Clock;
|
||||
import org.briarproject.bramble.api.transport.StreamWriter;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
@@ -53,19 +54,21 @@ class SyncSessionFactoryImpl implements SyncSessionFactory {
|
||||
|
||||
@Override
|
||||
public SyncSession createSimplexOutgoingSession(ContactId c,
|
||||
int maxLatency, OutputStream out) {
|
||||
int maxLatency, StreamWriter streamWriter) {
|
||||
OutputStream out = streamWriter.getOutputStream();
|
||||
SyncRecordWriter recordWriter =
|
||||
recordWriterFactory.createRecordWriter(out);
|
||||
return new SimplexOutgoingSession(db, dbExecutor, eventBus, c,
|
||||
maxLatency, recordWriter);
|
||||
maxLatency, streamWriter, recordWriter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SyncSession createDuplexOutgoingSession(ContactId c, int maxLatency,
|
||||
int maxIdleTime, OutputStream out) {
|
||||
int maxIdleTime, StreamWriter streamWriter) {
|
||||
OutputStream out = streamWriter.getOutputStream();
|
||||
SyncRecordWriter recordWriter =
|
||||
recordWriterFactory.createRecordWriter(out);
|
||||
return new DuplexOutgoingSession(db, dbExecutor, eventBus, clock, c,
|
||||
maxLatency, maxIdleTime, recordWriter);
|
||||
maxLatency, maxIdleTime, streamWriter, recordWriter);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import org.briarproject.bramble.api.crypto.SecretKey;
|
||||
import org.briarproject.bramble.api.crypto.StreamEncrypterFactory;
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.transport.StreamContext;
|
||||
import org.briarproject.bramble.api.transport.StreamWriter;
|
||||
import org.briarproject.bramble.api.transport.StreamWriterFactory;
|
||||
|
||||
import java.io.OutputStream;
|
||||
@@ -23,14 +24,14 @@ class StreamWriterFactoryImpl implements StreamWriterFactory {
|
||||
}
|
||||
|
||||
@Override
|
||||
public OutputStream createStreamWriter(OutputStream out,
|
||||
public StreamWriter createStreamWriter(OutputStream out,
|
||||
StreamContext ctx) {
|
||||
return new StreamWriterImpl(
|
||||
streamEncrypterFactory.createStreamEncrypter(out, ctx));
|
||||
}
|
||||
|
||||
@Override
|
||||
public OutputStream createContactExchangeStreamWriter(OutputStream out,
|
||||
public StreamWriter createContactExchangeStreamWriter(OutputStream out,
|
||||
SecretKey headerKey) {
|
||||
return new StreamWriterImpl(
|
||||
streamEncrypterFactory.createContactExchangeStreamDecrypter(out,
|
||||
|
||||
@@ -2,6 +2,7 @@ package org.briarproject.bramble.transport;
|
||||
|
||||
import org.briarproject.bramble.api.crypto.StreamEncrypter;
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.transport.StreamWriter;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
@@ -17,7 +18,7 @@ import static org.briarproject.bramble.api.transport.TransportConstants.MAX_PAYL
|
||||
*/
|
||||
@NotThreadSafe
|
||||
@NotNullByDefault
|
||||
class StreamWriterImpl extends OutputStream {
|
||||
class StreamWriterImpl extends OutputStream implements StreamWriter {
|
||||
|
||||
private final StreamEncrypter encrypter;
|
||||
private final byte[] payload;
|
||||
@@ -29,6 +30,17 @@ class StreamWriterImpl extends OutputStream {
|
||||
payload = new byte[MAX_PAYLOAD_LENGTH];
|
||||
}
|
||||
|
||||
@Override
|
||||
public OutputStream getOutputStream() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendEndOfStream() throws IOException {
|
||||
writeFrame(true);
|
||||
encrypter.flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
writeFrame(true);
|
||||
|
||||
Reference in New Issue
Block a user