mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-15 04:18:53 +01:00
Merge branch '2226-defer-marking-messages-and-acks-as-sent' into 'master'
Defer marking messages and acks as sent Closes #2296 See merge request briar/briar!1635
This commit is contained in:
@@ -406,6 +406,12 @@ interface Database<T> {
|
||||
Collection<MessageId> getMessageIds(T txn, GroupId g, Metadata query)
|
||||
throws DbException;
|
||||
|
||||
/**
|
||||
* Returns the length of the given message in bytes, including the
|
||||
* message header.
|
||||
*/
|
||||
int getMessageLength(T txn, MessageId m) throws DbException;
|
||||
|
||||
/**
|
||||
* Returns the metadata for all delivered messages in the given group.
|
||||
* <p/>
|
||||
@@ -496,7 +502,8 @@ interface Database<T> {
|
||||
|
||||
/**
|
||||
* Returns the IDs of some messages that are eligible to be sent to the
|
||||
* given contact, up to the given total length.
|
||||
* given contact. The total length of the messages including record headers
|
||||
* will be no more than the given capacity.
|
||||
* <p/>
|
||||
* Unlike {@link #getUnackedMessagesToSend(Object, ContactId)} this method
|
||||
* does not return messages that have already been sent unless they are
|
||||
@@ -504,20 +511,20 @@ interface Database<T> {
|
||||
* <p/>
|
||||
* Read-only.
|
||||
*/
|
||||
Collection<MessageId> getMessagesToSend(T txn, ContactId c, int maxLength,
|
||||
Collection<MessageId> getMessagesToSend(T txn, ContactId c, long capacity,
|
||||
long maxLatency) throws DbException;
|
||||
|
||||
/**
|
||||
* Returns the IDs of all messages that are eligible to be sent to the
|
||||
* given contact, together with their raw lengths.
|
||||
* given contact.
|
||||
* <p/>
|
||||
* Unlike {@link #getMessagesToSend(Object, ContactId, int, long)} this
|
||||
* Unlike {@link #getMessagesToSend(Object, ContactId, long, long)} this
|
||||
* method may return messages that have already been sent and are not yet
|
||||
* due for retransmission.
|
||||
* <p/>
|
||||
* Read-only.
|
||||
*/
|
||||
Map<MessageId, Integer> getUnackedMessagesToSend(T txn, ContactId c)
|
||||
Collection<MessageId> getUnackedMessagesToSend(T txn, ContactId c)
|
||||
throws DbException;
|
||||
|
||||
/**
|
||||
@@ -598,13 +605,14 @@ interface Database<T> {
|
||||
|
||||
/**
|
||||
* Returns the IDs of some messages that are eligible to be sent to the
|
||||
* given contact and have been requested by the contact, up to the given
|
||||
* total length.
|
||||
* given contact and have been requested by the contact. The total length
|
||||
* of the messages including record headers will be no more than the given
|
||||
* capacity.
|
||||
* <p/>
|
||||
* Read-only.
|
||||
*/
|
||||
Collection<MessageId> getRequestedMessagesToSend(T txn, ContactId c,
|
||||
int maxLength, long maxLatency) throws DbException;
|
||||
long capacity, long maxLatency) throws DbException;
|
||||
|
||||
/**
|
||||
* Returns all settings in the given namespace.
|
||||
|
||||
@@ -75,7 +75,6 @@ import org.briarproject.bramble.api.transport.TransportKeys;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executor;
|
||||
@@ -87,6 +86,7 @@ import javax.annotation.Nullable;
|
||||
import javax.annotation.concurrent.ThreadSafe;
|
||||
import javax.inject.Inject;
|
||||
|
||||
import static java.util.Collections.singletonList;
|
||||
import static java.util.logging.Level.WARNING;
|
||||
import static java.util.logging.Logger.getLogger;
|
||||
import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE;
|
||||
@@ -424,13 +424,14 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
|
||||
@Nullable
|
||||
@Override
|
||||
public Collection<Message> generateBatch(Transaction transaction,
|
||||
ContactId c, int maxLength, long maxLatency) throws DbException {
|
||||
ContactId c, long capacity, long maxLatency) throws DbException {
|
||||
if (transaction.isReadOnly()) throw new IllegalArgumentException();
|
||||
T txn = unbox(transaction);
|
||||
if (!db.containsContact(txn, c))
|
||||
throw new NoSuchContactException();
|
||||
Collection<MessageId> ids =
|
||||
db.getMessagesToSend(txn, c, maxLength, maxLatency);
|
||||
db.getMessagesToSend(txn, c, capacity, maxLatency);
|
||||
if (ids.isEmpty()) return null;
|
||||
long totalLength = 0;
|
||||
List<Message> messages = new ArrayList<>(ids.size());
|
||||
for (MessageId m : ids) {
|
||||
@@ -439,38 +440,11 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
|
||||
messages.add(message);
|
||||
db.updateRetransmissionData(txn, c, m, maxLatency);
|
||||
}
|
||||
if (ids.isEmpty()) return null;
|
||||
db.lowerRequestedFlag(txn, c, ids);
|
||||
transaction.attach(new MessagesSentEvent(c, ids, totalLength));
|
||||
return messages;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<Message> generateBatch(Transaction transaction,
|
||||
ContactId c, Collection<MessageId> ids, long maxLatency)
|
||||
throws DbException {
|
||||
if (transaction.isReadOnly()) throw new IllegalArgumentException();
|
||||
T txn = unbox(transaction);
|
||||
if (!db.containsContact(txn, c))
|
||||
throw new NoSuchContactException();
|
||||
long totalLength = 0;
|
||||
List<Message> messages = new ArrayList<>(ids.size());
|
||||
List<MessageId> sentIds = new ArrayList<>(ids.size());
|
||||
for (MessageId m : ids) {
|
||||
if (db.containsVisibleMessage(txn, c, m)) {
|
||||
Message message = db.getMessage(txn, m);
|
||||
totalLength += message.getRawLength();
|
||||
messages.add(message);
|
||||
sentIds.add(m);
|
||||
db.updateRetransmissionData(txn, c, m, maxLatency);
|
||||
}
|
||||
}
|
||||
if (messages.isEmpty()) return messages;
|
||||
db.lowerRequestedFlag(txn, c, sentIds);
|
||||
transaction.attach(new MessagesSentEvent(c, sentIds, totalLength));
|
||||
return messages;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public Offer generateOffer(Transaction transaction, ContactId c,
|
||||
@@ -505,13 +479,14 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
|
||||
@Nullable
|
||||
@Override
|
||||
public Collection<Message> generateRequestedBatch(Transaction transaction,
|
||||
ContactId c, int maxLength, long maxLatency) throws DbException {
|
||||
ContactId c, long capacity, long maxLatency) throws DbException {
|
||||
if (transaction.isReadOnly()) throw new IllegalArgumentException();
|
||||
T txn = unbox(transaction);
|
||||
if (!db.containsContact(txn, c))
|
||||
throw new NoSuchContactException();
|
||||
Collection<MessageId> ids =
|
||||
db.getRequestedMessagesToSend(txn, c, maxLength, maxLatency);
|
||||
db.getRequestedMessagesToSend(txn, c, capacity, maxLatency);
|
||||
if (ids.isEmpty()) return null;
|
||||
long totalLength = 0;
|
||||
List<Message> messages = new ArrayList<>(ids.size());
|
||||
for (MessageId m : ids) {
|
||||
@@ -520,7 +495,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
|
||||
messages.add(message);
|
||||
db.updateRetransmissionData(txn, c, m, maxLatency);
|
||||
}
|
||||
if (ids.isEmpty()) return null;
|
||||
db.lowerRequestedFlag(txn, c, ids);
|
||||
transaction.attach(new MessagesSentEvent(c, ids, totalLength));
|
||||
return messages;
|
||||
@@ -635,6 +609,24 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
|
||||
return db.getMessageIds(txn, g, query);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<MessageId> getMessagesToAck(Transaction transaction,
|
||||
ContactId c, int maxMessages) throws DbException {
|
||||
T txn = unbox(transaction);
|
||||
if (!db.containsContact(txn, c))
|
||||
throw new NoSuchContactException();
|
||||
return db.getMessagesToAck(txn, c, maxMessages);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<MessageId> getMessagesToSend(Transaction transaction,
|
||||
ContactId c, long capacity, long maxLatency) throws DbException {
|
||||
T txn = unbox(transaction);
|
||||
if (!db.containsContact(txn, c))
|
||||
throw new NoSuchContactException();
|
||||
return db.getMessagesToSend(txn, c, capacity, maxLatency);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<MessageId> getMessagesToValidate(Transaction transaction)
|
||||
throws DbException {
|
||||
@@ -740,10 +732,29 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
|
||||
return status;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public Map<MessageId, Integer> getUnackedMessagesToSend(
|
||||
Transaction transaction,
|
||||
ContactId c) throws DbException {
|
||||
public Message getMessageToSend(Transaction transaction, ContactId c,
|
||||
MessageId m, long maxLatency, boolean markAsSent)
|
||||
throws DbException {
|
||||
if (transaction.isReadOnly()) throw new IllegalArgumentException();
|
||||
T txn = unbox(transaction);
|
||||
if (!db.containsContact(txn, c))
|
||||
throw new NoSuchContactException();
|
||||
if (!db.containsVisibleMessage(txn, c, m)) return null;
|
||||
Message message = db.getMessage(txn, m);
|
||||
if (markAsSent) {
|
||||
db.updateRetransmissionData(txn, c, m, maxLatency);
|
||||
db.lowerRequestedFlag(txn, c, singletonList(m));
|
||||
transaction.attach(new MessagesSentEvent(c, singletonList(m),
|
||||
message.getRawLength()));
|
||||
}
|
||||
return message;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<MessageId> getUnackedMessagesToSend(
|
||||
Transaction transaction, ContactId c) throws DbException {
|
||||
T txn = unbox(transaction);
|
||||
if (!db.containsContact(txn, c))
|
||||
throw new NoSuchContactException();
|
||||
@@ -1069,6 +1080,20 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
|
||||
db.removeTransportKeys(txn, t, k);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAckSent(Transaction transaction, ContactId c,
|
||||
Collection<MessageId> acked) throws DbException {
|
||||
if (transaction.isReadOnly()) throw new IllegalArgumentException();
|
||||
T txn = unbox(transaction);
|
||||
if (!db.containsContact(txn, c))
|
||||
throw new NoSuchContactException();
|
||||
List<MessageId> visible = new ArrayList<>(acked.size());
|
||||
for (MessageId m : acked) {
|
||||
if (db.containsVisibleMessage(txn, c, m)) visible.add(m);
|
||||
}
|
||||
db.lowerAckFlag(txn, c, visible);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setCleanupTimerDuration(Transaction transaction, MessageId m,
|
||||
long duration) throws DbException {
|
||||
@@ -1115,7 +1140,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
|
||||
if (old == INVISIBLE) db.addGroupVisibility(txn, c, g, v == SHARED);
|
||||
else if (v == INVISIBLE) db.removeGroupVisibility(txn, c, g);
|
||||
else db.setGroupVisibility(txn, c, g, v == SHARED);
|
||||
List<ContactId> affected = Collections.singletonList(c);
|
||||
List<ContactId> affected = singletonList(c);
|
||||
transaction.attach(new GroupVisibilityUpdatedEvent(affected));
|
||||
}
|
||||
|
||||
@@ -1163,6 +1188,28 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
|
||||
transaction.attach(new MessageStateChangedEvent(m, false, state));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMessagesSent(Transaction transaction, ContactId c,
|
||||
Collection<MessageId> sent, long maxLatency) throws DbException {
|
||||
if (transaction.isReadOnly()) throw new IllegalArgumentException();
|
||||
T txn = unbox(transaction);
|
||||
if (!db.containsContact(txn, c))
|
||||
throw new NoSuchContactException();
|
||||
long totalLength = 0;
|
||||
List<MessageId> visible = new ArrayList<>(sent.size());
|
||||
for (MessageId m : sent) {
|
||||
if (db.containsVisibleMessage(txn, c, m)) {
|
||||
visible.add(m);
|
||||
totalLength += db.getMessageLength(txn, m);
|
||||
db.updateRetransmissionData(txn, c, m, maxLatency);
|
||||
}
|
||||
}
|
||||
db.lowerRequestedFlag(txn, c, visible);
|
||||
if (!visible.isEmpty()) {
|
||||
transaction.attach(new MessagesSentEvent(c, visible, totalLength));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addMessageDependencies(Transaction transaction,
|
||||
Message dependent, Collection<MessageId> dependencies)
|
||||
|
||||
@@ -51,7 +51,6 @@ import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -77,6 +76,7 @@ import static java.util.logging.Logger.getLogger;
|
||||
import static org.briarproject.bramble.api.db.DatabaseComponent.NO_CLEANUP_DEADLINE;
|
||||
import static org.briarproject.bramble.api.db.DatabaseComponent.TIMER_NOT_STARTED;
|
||||
import static org.briarproject.bramble.api.db.Metadata.REMOVE;
|
||||
import static org.briarproject.bramble.api.record.Record.RECORD_HEADER_BYTES;
|
||||
import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE;
|
||||
import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED;
|
||||
import static org.briarproject.bramble.api.sync.Group.Visibility.VISIBLE;
|
||||
@@ -1908,6 +1908,31 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMessageLength(Connection txn, MessageId m)
|
||||
throws DbException {
|
||||
PreparedStatement ps = null;
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
String sql = "SELECT length from messages"
|
||||
+ " WHERE messageId = ? AND state = ?";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setBytes(1, m.getBytes());
|
||||
ps.setInt(2, DELIVERED.getValue());
|
||||
rs = ps.executeQuery();
|
||||
if (!rs.next()) throw new DbStateException();
|
||||
int length = rs.getInt(1);
|
||||
if (rs.next()) throw new DbStateException();
|
||||
rs.close();
|
||||
ps.close();
|
||||
return length;
|
||||
} catch (SQLException e) {
|
||||
tryToClose(rs, LOG, WARNING);
|
||||
tryToClose(ps, LOG, WARNING);
|
||||
throw new DbException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<MessageId, Metadata> getMessageMetadata(Connection txn,
|
||||
GroupId g) throws DbException {
|
||||
@@ -2256,8 +2281,8 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<MessageId> getMessagesToSend(Connection txn, ContactId c,
|
||||
int maxLength, long maxLatency) throws DbException {
|
||||
public Collection<MessageId> getMessagesToSend(Connection txn,
|
||||
ContactId c, long capacity, long maxLatency) throws DbException {
|
||||
long now = clock.currentTimeMillis();
|
||||
PreparedStatement ps = null;
|
||||
ResultSet rs = null;
|
||||
@@ -2277,12 +2302,11 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
ps.setLong(4, maxLatency);
|
||||
rs = ps.executeQuery();
|
||||
List<MessageId> ids = new ArrayList<>();
|
||||
int total = 0;
|
||||
while (rs.next()) {
|
||||
int length = rs.getInt(1);
|
||||
if (total + length > maxLength) break;
|
||||
if (capacity < RECORD_HEADER_BYTES + length) break;
|
||||
ids.add(new MessageId(rs.getBytes(2)));
|
||||
total += length;
|
||||
capacity -= RECORD_HEADER_BYTES + length;
|
||||
}
|
||||
rs.close();
|
||||
ps.close();
|
||||
@@ -2295,12 +2319,12 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<MessageId, Integer> getUnackedMessagesToSend(Connection txn,
|
||||
public Collection<MessageId> getUnackedMessagesToSend(Connection txn,
|
||||
ContactId c) throws DbException {
|
||||
PreparedStatement ps = null;
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
String sql = "SELECT length, messageId FROM statuses"
|
||||
String sql = "SELECT messageId FROM statuses"
|
||||
+ " WHERE contactId = ? AND state = ?"
|
||||
+ " AND groupShared = TRUE AND messageShared = TRUE"
|
||||
+ " AND deleted = FALSE AND seen = FALSE"
|
||||
@@ -2309,15 +2333,11 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
ps.setInt(1, c.getInt());
|
||||
ps.setInt(2, DELIVERED.getValue());
|
||||
rs = ps.executeQuery();
|
||||
Map<MessageId, Integer> results = new LinkedHashMap<>();
|
||||
while (rs.next()) {
|
||||
int length = rs.getInt(1);
|
||||
MessageId id = new MessageId(rs.getBytes(2));
|
||||
results.put(id, length);
|
||||
}
|
||||
List<MessageId> ids = new ArrayList<>();
|
||||
while (rs.next()) ids.add(new MessageId(rs.getBytes(1)));
|
||||
rs.close();
|
||||
ps.close();
|
||||
return results;
|
||||
return ids;
|
||||
} catch (SQLException e) {
|
||||
tryToClose(rs, LOG, WARNING);
|
||||
tryToClose(ps, LOG, WARNING);
|
||||
@@ -2430,6 +2450,7 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
MessageId m = new MessageId(rs.getBytes(1));
|
||||
GroupId g = new GroupId(rs.getBytes(2));
|
||||
Collection<MessageId> messageIds = ids.get(g);
|
||||
//noinspection Java8MapApi
|
||||
if (messageIds == null) {
|
||||
messageIds = new ArrayList<>();
|
||||
ids.put(g, messageIds);
|
||||
@@ -2556,7 +2577,7 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
|
||||
@Override
|
||||
public Collection<MessageId> getRequestedMessagesToSend(Connection txn,
|
||||
ContactId c, int maxLength, long maxLatency) throws DbException {
|
||||
ContactId c, long capacity, long maxLatency) throws DbException {
|
||||
long now = clock.currentTimeMillis();
|
||||
PreparedStatement ps = null;
|
||||
ResultSet rs = null;
|
||||
@@ -2576,12 +2597,11 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
ps.setLong(4, maxLatency);
|
||||
rs = ps.executeQuery();
|
||||
List<MessageId> ids = new ArrayList<>();
|
||||
int total = 0;
|
||||
while (rs.next()) {
|
||||
int length = rs.getInt(1);
|
||||
if (total + length > maxLength) break;
|
||||
if (capacity < RECORD_HEADER_BYTES + length) break;
|
||||
ids.add(new MessageId(rs.getBytes(2)));
|
||||
total += length;
|
||||
capacity -= RECORD_HEADER_BYTES + length;
|
||||
}
|
||||
rs.close();
|
||||
ps.close();
|
||||
@@ -2735,6 +2755,7 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
ContactId c = new ContactId(rs.getInt(1));
|
||||
TransportId t = new TransportId(rs.getString(2));
|
||||
Collection<TransportId> transportIds = ids.get(c);
|
||||
//noinspection Java8MapApi
|
||||
if (transportIds == null) {
|
||||
transportIds = new ArrayList<>();
|
||||
ids.put(c, transportIds);
|
||||
|
||||
@@ -19,6 +19,8 @@ class RecordWriterImpl implements RecordWriter {
|
||||
private final OutputStream out;
|
||||
private final byte[] header = new byte[RECORD_HEADER_BYTES];
|
||||
|
||||
private long bytesWritten = 0;
|
||||
|
||||
RecordWriterImpl(OutputStream out) {
|
||||
this.out = out;
|
||||
}
|
||||
@@ -31,6 +33,7 @@ class RecordWriterImpl implements RecordWriter {
|
||||
ByteUtils.writeUint16(payload.length, header, 2);
|
||||
out.write(header);
|
||||
out.write(payload);
|
||||
bytesWritten += RECORD_HEADER_BYTES + payload.length;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -42,4 +45,9 @@ class RecordWriterImpl implements RecordWriter {
|
||||
public void close() throws IOException {
|
||||
out.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getBytesWritten() {
|
||||
return bytesWritten;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,11 +13,13 @@ import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent;
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.plugin.TransportId;
|
||||
import org.briarproject.bramble.api.plugin.event.TransportInactiveEvent;
|
||||
import org.briarproject.bramble.api.record.Record;
|
||||
import org.briarproject.bramble.api.sync.Ack;
|
||||
import org.briarproject.bramble.api.sync.Message;
|
||||
import org.briarproject.bramble.api.sync.Offer;
|
||||
import org.briarproject.bramble.api.sync.Priority;
|
||||
import org.briarproject.bramble.api.sync.Request;
|
||||
import org.briarproject.bramble.api.sync.SyncConstants;
|
||||
import org.briarproject.bramble.api.sync.SyncRecordWriter;
|
||||
import org.briarproject.bramble.api.sync.SyncSession;
|
||||
import org.briarproject.bramble.api.sync.Versions;
|
||||
@@ -47,8 +49,9 @@ import static java.util.logging.Level.INFO;
|
||||
import static java.util.logging.Level.WARNING;
|
||||
import static java.util.logging.Logger.getLogger;
|
||||
import static org.briarproject.bramble.api.lifecycle.LifecycleManager.LifecycleState.STOPPING;
|
||||
import static org.briarproject.bramble.api.record.Record.MAX_RECORD_PAYLOAD_BYTES;
|
||||
import static org.briarproject.bramble.api.record.Record.RECORD_HEADER_BYTES;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_LENGTH;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.SUPPORTED_VERSIONS;
|
||||
import static org.briarproject.bramble.util.LogUtils.logException;
|
||||
|
||||
@@ -71,6 +74,16 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
|
||||
NEXT_SEND_TIME_DECREASED = () -> {
|
||||
};
|
||||
|
||||
/**
|
||||
* The batch capacity must be at least {@link Record#RECORD_HEADER_BYTES}
|
||||
* + {@link SyncConstants#MAX_MESSAGE_LENGTH} to ensure that maximum-size
|
||||
* messages can be selected for transmission. Larger batches will mean
|
||||
* fewer round-trips between the DB and the output stream, but each
|
||||
* round-trip will block the DB for longer.
|
||||
*/
|
||||
private static final int BATCH_CAPACITY =
|
||||
(RECORD_HEADER_BYTES + MAX_MESSAGE_LENGTH) * 2;
|
||||
|
||||
private final DatabaseComponent db;
|
||||
private final Executor dbExecutor;
|
||||
private final EventBus eventBus;
|
||||
@@ -296,8 +309,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
|
||||
db.transactionWithNullableResult(false, txn -> {
|
||||
Collection<Message> batch =
|
||||
db.generateRequestedBatch(txn, contactId,
|
||||
MAX_RECORD_PAYLOAD_BYTES,
|
||||
maxLatency);
|
||||
BATCH_CAPACITY, maxLatency);
|
||||
setNextSendTime(db.getNextSendTime(txn, contactId));
|
||||
return batch;
|
||||
});
|
||||
|
||||
@@ -0,0 +1,66 @@
|
||||
package org.briarproject.bramble.sync;
|
||||
|
||||
import org.briarproject.bramble.api.contact.ContactId;
|
||||
import org.briarproject.bramble.api.db.DatabaseComponent;
|
||||
import org.briarproject.bramble.api.db.DbException;
|
||||
import org.briarproject.bramble.api.event.EventBus;
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.plugin.TransportId;
|
||||
import org.briarproject.bramble.api.sync.Message;
|
||||
import org.briarproject.bramble.api.sync.MessageId;
|
||||
import org.briarproject.bramble.api.sync.SyncRecordWriter;
|
||||
import org.briarproject.bramble.api.transport.StreamWriter;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import javax.annotation.concurrent.ThreadSafe;
|
||||
|
||||
import static java.util.logging.Level.INFO;
|
||||
import static java.util.logging.Logger.getLogger;
|
||||
|
||||
/**
|
||||
* A {@link SimplexOutgoingSession} that sends messages eagerly, ie
|
||||
* regardless of whether they're due for retransmission.
|
||||
*/
|
||||
@ThreadSafe
|
||||
@NotNullByDefault
|
||||
class EagerSimplexOutgoingSession extends SimplexOutgoingSession {
|
||||
|
||||
private static final Logger LOG =
|
||||
getLogger(EagerSimplexOutgoingSession.class.getName());
|
||||
|
||||
EagerSimplexOutgoingSession(DatabaseComponent db,
|
||||
EventBus eventBus,
|
||||
ContactId contactId,
|
||||
TransportId transportId,
|
||||
long maxLatency,
|
||||
StreamWriter streamWriter,
|
||||
SyncRecordWriter recordWriter) {
|
||||
super(db, eventBus, contactId, transportId, maxLatency, streamWriter,
|
||||
recordWriter);
|
||||
}
|
||||
|
||||
@Override
|
||||
void sendMessages() throws DbException, IOException {
|
||||
for (MessageId m : loadUnackedMessageIdsToSend()) {
|
||||
if (isInterrupted()) break;
|
||||
Message message = db.transactionWithNullableResult(false, txn ->
|
||||
db.getMessageToSend(txn, contactId, m, maxLatency, true));
|
||||
if (message == null) continue; // No longer shared
|
||||
recordWriter.writeMessage(message);
|
||||
LOG.info("Sent message");
|
||||
}
|
||||
}
|
||||
|
||||
private Collection<MessageId> loadUnackedMessageIdsToSend()
|
||||
throws DbException {
|
||||
Collection<MessageId> ids = db.transactionWithResult(true, txn ->
|
||||
db.getUnackedMessagesToSend(txn, contactId));
|
||||
if (LOG.isLoggable(INFO)) {
|
||||
LOG.info(ids.size() + " unacked messages to send");
|
||||
}
|
||||
return ids;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,117 @@
|
||||
package org.briarproject.bramble.sync;
|
||||
|
||||
import org.briarproject.bramble.api.contact.ContactId;
|
||||
import org.briarproject.bramble.api.db.DatabaseComponent;
|
||||
import org.briarproject.bramble.api.db.DbException;
|
||||
import org.briarproject.bramble.api.event.EventBus;
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.plugin.TransportId;
|
||||
import org.briarproject.bramble.api.sync.Ack;
|
||||
import org.briarproject.bramble.api.sync.DeferredSendHandler;
|
||||
import org.briarproject.bramble.api.sync.Message;
|
||||
import org.briarproject.bramble.api.sync.MessageId;
|
||||
import org.briarproject.bramble.api.sync.SyncRecordWriter;
|
||||
import org.briarproject.bramble.api.transport.StreamWriter;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import javax.annotation.concurrent.ThreadSafe;
|
||||
|
||||
import static java.lang.Math.min;
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.logging.Level.INFO;
|
||||
import static java.util.logging.Logger.getLogger;
|
||||
import static org.briarproject.bramble.api.record.Record.RECORD_HEADER_BYTES;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
|
||||
|
||||
/**
|
||||
* A {@link SimplexOutgoingSession} for sending and acking messages via a
|
||||
* mailbox. The session uses a {@link DeferredSendHandler} to record the IDs
|
||||
* of the messages sent and acked during the session so that they can be
|
||||
* recorded in the DB as sent or acked after the file has been successfully
|
||||
* uploaded to the mailbox.
|
||||
*/
|
||||
@ThreadSafe
|
||||
@NotNullByDefault
|
||||
class MailboxOutgoingSession extends SimplexOutgoingSession {
|
||||
|
||||
private static final Logger LOG =
|
||||
getLogger(MailboxOutgoingSession.class.getName());
|
||||
|
||||
private final DeferredSendHandler deferredSendHandler;
|
||||
private final long initialCapacity;
|
||||
|
||||
MailboxOutgoingSession(DatabaseComponent db,
|
||||
EventBus eventBus,
|
||||
ContactId contactId,
|
||||
TransportId transportId,
|
||||
long maxLatency,
|
||||
StreamWriter streamWriter,
|
||||
SyncRecordWriter recordWriter,
|
||||
DeferredSendHandler deferredSendHandler,
|
||||
long capacity) {
|
||||
super(db, eventBus, contactId, transportId, maxLatency, streamWriter,
|
||||
recordWriter);
|
||||
this.deferredSendHandler = deferredSendHandler;
|
||||
this.initialCapacity = capacity;
|
||||
}
|
||||
|
||||
@Override
|
||||
void sendAcks() throws DbException, IOException {
|
||||
while (!isInterrupted()) {
|
||||
Collection<MessageId> idsToAck = loadMessageIdsToAck();
|
||||
if (idsToAck.isEmpty()) break;
|
||||
recordWriter.writeAck(new Ack(idsToAck));
|
||||
deferredSendHandler.onAckSent(idsToAck);
|
||||
LOG.info("Sent ack");
|
||||
}
|
||||
}
|
||||
|
||||
private Collection<MessageId> loadMessageIdsToAck() throws DbException {
|
||||
long idCapacity = (getRemainingCapacity() - RECORD_HEADER_BYTES)
|
||||
/ MessageId.LENGTH;
|
||||
if (idCapacity <= 0) return emptyList(); // Out of capacity
|
||||
int maxMessageIds = (int) min(idCapacity, MAX_MESSAGE_IDS);
|
||||
Collection<MessageId> ids = db.transactionWithResult(true, txn ->
|
||||
db.getMessagesToAck(txn, contactId, maxMessageIds));
|
||||
if (LOG.isLoggable(INFO)) {
|
||||
LOG.info(ids.size() + " messages to ack");
|
||||
}
|
||||
return ids;
|
||||
}
|
||||
|
||||
private long getRemainingCapacity() {
|
||||
return initialCapacity - recordWriter.getBytesWritten();
|
||||
}
|
||||
|
||||
@Override
|
||||
void sendMessages() throws DbException, IOException {
|
||||
for (MessageId m : loadMessageIdsToSend()) {
|
||||
if (isInterrupted()) break;
|
||||
// Defer marking the message as sent
|
||||
Message message = db.transactionWithNullableResult(true, txn ->
|
||||
db.getMessageToSend(txn, contactId, m, maxLatency, false));
|
||||
if (message == null) continue; // No longer shared
|
||||
recordWriter.writeMessage(message);
|
||||
deferredSendHandler.onMessageSent(m);
|
||||
LOG.info("Sent message");
|
||||
}
|
||||
}
|
||||
|
||||
private Collection<MessageId> loadMessageIdsToSend() throws DbException {
|
||||
long capacity = getRemainingCapacity();
|
||||
if (capacity < RECORD_HEADER_BYTES + MESSAGE_HEADER_LENGTH) {
|
||||
return emptyList(); // Out of capacity
|
||||
}
|
||||
Collection<MessageId> ids = db.transactionWithResult(true, txn ->
|
||||
db.getMessagesToSend(txn, contactId, capacity, maxLatency));
|
||||
if (LOG.isLoggable(INFO)) {
|
||||
LOG.info(ids.size() + " messages to send");
|
||||
}
|
||||
return ids;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -3,7 +3,6 @@ package org.briarproject.bramble.sync;
|
||||
import org.briarproject.bramble.api.contact.ContactId;
|
||||
import org.briarproject.bramble.api.contact.event.ContactRemovedEvent;
|
||||
import org.briarproject.bramble.api.db.DatabaseComponent;
|
||||
import org.briarproject.bramble.api.db.DatabaseExecutor;
|
||||
import org.briarproject.bramble.api.db.DbException;
|
||||
import org.briarproject.bramble.api.event.Event;
|
||||
import org.briarproject.bramble.api.event.EventBus;
|
||||
@@ -13,9 +12,10 @@ import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent;
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.plugin.TransportId;
|
||||
import org.briarproject.bramble.api.plugin.event.TransportInactiveEvent;
|
||||
import org.briarproject.bramble.api.record.Record;
|
||||
import org.briarproject.bramble.api.sync.Ack;
|
||||
import org.briarproject.bramble.api.sync.Message;
|
||||
import org.briarproject.bramble.api.sync.MessageId;
|
||||
import org.briarproject.bramble.api.sync.SyncConstants;
|
||||
import org.briarproject.bramble.api.sync.SyncRecordWriter;
|
||||
import org.briarproject.bramble.api.sync.SyncSession;
|
||||
import org.briarproject.bramble.api.sync.Versions;
|
||||
@@ -23,15 +23,7 @@ import org.briarproject.bramble.api.sync.event.CloseSyncConnectionsEvent;
|
||||
import org.briarproject.bramble.api.transport.StreamWriter;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import javax.annotation.concurrent.ThreadSafe;
|
||||
@@ -40,8 +32,9 @@ import static java.util.logging.Level.INFO;
|
||||
import static java.util.logging.Level.WARNING;
|
||||
import static java.util.logging.Logger.getLogger;
|
||||
import static org.briarproject.bramble.api.lifecycle.LifecycleManager.LifecycleState.STOPPING;
|
||||
import static org.briarproject.bramble.api.record.Record.MAX_RECORD_PAYLOAD_BYTES;
|
||||
import static org.briarproject.bramble.api.record.Record.RECORD_HEADER_BYTES;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_LENGTH;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.SUPPORTED_VERSIONS;
|
||||
import static org.briarproject.bramble.util.LogUtils.logException;
|
||||
|
||||
@@ -57,38 +50,40 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
|
||||
private static final Logger LOG =
|
||||
getLogger(SimplexOutgoingSession.class.getName());
|
||||
|
||||
private static final ThrowingRunnable<IOException> CLOSE = () -> {
|
||||
};
|
||||
/**
|
||||
* The batch capacity must be at least {@link Record#RECORD_HEADER_BYTES}
|
||||
* + {@link SyncConstants#MAX_MESSAGE_LENGTH} to ensure that maximum-size
|
||||
* messages can be selected for transmission. Larger batches will mean
|
||||
* fewer round-trips between the DB and the output stream, but each
|
||||
* round-trip will block the DB for longer.
|
||||
*/
|
||||
static final int BATCH_CAPACITY =
|
||||
(RECORD_HEADER_BYTES + MAX_MESSAGE_LENGTH) * 2;
|
||||
|
||||
private final DatabaseComponent db;
|
||||
private final Executor dbExecutor;
|
||||
private final EventBus eventBus;
|
||||
private final ContactId contactId;
|
||||
private final TransportId transportId;
|
||||
private final long maxLatency;
|
||||
private final boolean eager;
|
||||
private final StreamWriter streamWriter;
|
||||
private final SyncRecordWriter recordWriter;
|
||||
private final AtomicInteger outstandingQueries;
|
||||
private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks;
|
||||
protected final DatabaseComponent db;
|
||||
protected final EventBus eventBus;
|
||||
protected final ContactId contactId;
|
||||
protected final TransportId transportId;
|
||||
protected final long maxLatency;
|
||||
protected final StreamWriter streamWriter;
|
||||
protected final SyncRecordWriter recordWriter;
|
||||
|
||||
private volatile boolean interrupted = false;
|
||||
|
||||
SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
|
||||
EventBus eventBus, ContactId contactId, TransportId transportId,
|
||||
long maxLatency, boolean eager, StreamWriter streamWriter,
|
||||
SimplexOutgoingSession(DatabaseComponent db,
|
||||
EventBus eventBus,
|
||||
ContactId contactId,
|
||||
TransportId transportId,
|
||||
long maxLatency,
|
||||
StreamWriter streamWriter,
|
||||
SyncRecordWriter recordWriter) {
|
||||
this.db = db;
|
||||
this.dbExecutor = dbExecutor;
|
||||
this.eventBus = eventBus;
|
||||
this.contactId = contactId;
|
||||
this.transportId = transportId;
|
||||
this.maxLatency = maxLatency;
|
||||
this.eager = eager;
|
||||
this.streamWriter = streamWriter;
|
||||
this.recordWriter = recordWriter;
|
||||
outstandingQueries = new AtomicInteger(2); // One per type of record
|
||||
writerTasks = new LinkedBlockingQueue<>();
|
||||
}
|
||||
|
||||
@IoExecutor
|
||||
@@ -98,22 +93,13 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
|
||||
try {
|
||||
// Send our supported protocol versions
|
||||
recordWriter.writeVersions(new Versions(SUPPORTED_VERSIONS));
|
||||
// Start a query for each type of record
|
||||
dbExecutor.execute(this::generateAck);
|
||||
if (eager) dbExecutor.execute(this::loadUnackedMessageIds);
|
||||
else dbExecutor.execute(this::generateBatch);
|
||||
// Write records until interrupted or no more records to write
|
||||
try {
|
||||
while (!interrupted) {
|
||||
ThrowingRunnable<IOException> task = writerTasks.take();
|
||||
if (task == CLOSE) break;
|
||||
task.run();
|
||||
}
|
||||
streamWriter.sendEndOfStream();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("Interrupted while waiting for a record to write");
|
||||
Thread.currentThread().interrupt();
|
||||
sendAcks();
|
||||
sendMessages();
|
||||
} catch (DbException e) {
|
||||
logException(LOG, WARNING, e);
|
||||
}
|
||||
streamWriter.sendEndOfStream();
|
||||
} finally {
|
||||
eventBus.removeListener(this);
|
||||
}
|
||||
@@ -122,11 +108,10 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
|
||||
@Override
|
||||
public void interrupt() {
|
||||
interrupted = true;
|
||||
writerTasks.add(CLOSE);
|
||||
}
|
||||
|
||||
private void decrementOutstandingQueries() {
|
||||
if (outstandingQueries.decrementAndGet() == 0) writerTasks.add(CLOSE);
|
||||
boolean isInterrupted() {
|
||||
return interrupted;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -146,110 +131,33 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
|
||||
}
|
||||
}
|
||||
|
||||
@DatabaseExecutor
|
||||
private void loadUnackedMessageIds() {
|
||||
if (interrupted) return;
|
||||
try {
|
||||
Map<MessageId, Integer> ids = db.transactionWithResult(true, txn ->
|
||||
db.getUnackedMessagesToSend(txn, contactId));
|
||||
if (LOG.isLoggable(INFO)) {
|
||||
LOG.info(ids.size() + " unacked messages to send");
|
||||
}
|
||||
if (ids.isEmpty()) decrementOutstandingQueries();
|
||||
else dbExecutor.execute(() -> generateEagerBatch(ids));
|
||||
} catch (DbException e) {
|
||||
logException(LOG, WARNING, e);
|
||||
interrupt();
|
||||
}
|
||||
void sendAcks() throws DbException, IOException {
|
||||
while (!isInterrupted()) if (!generateAndSendAck()) break;
|
||||
}
|
||||
|
||||
@DatabaseExecutor
|
||||
private void generateEagerBatch(Map<MessageId, Integer> ids) {
|
||||
if (interrupted) return;
|
||||
// Take some message IDs from `ids` to form a batch
|
||||
Collection<MessageId> batchIds = new ArrayList<>();
|
||||
long totalLength = 0;
|
||||
Iterator<Entry<MessageId, Integer>> it = ids.entrySet().iterator();
|
||||
while (it.hasNext()) {
|
||||
// Check whether the next message will fit in the batch
|
||||
Entry<MessageId, Integer> e = it.next();
|
||||
int length = e.getValue();
|
||||
if (totalLength + length > MAX_RECORD_PAYLOAD_BYTES) break;
|
||||
// Add the message to the batch
|
||||
it.remove();
|
||||
batchIds.add(e.getKey());
|
||||
totalLength += length;
|
||||
}
|
||||
if (batchIds.isEmpty()) throw new AssertionError();
|
||||
try {
|
||||
Collection<Message> batch =
|
||||
db.transactionWithResult(false, txn ->
|
||||
db.generateBatch(txn, contactId, batchIds,
|
||||
maxLatency));
|
||||
writerTasks.add(() -> writeEagerBatch(batch, ids));
|
||||
} catch (DbException e) {
|
||||
logException(LOG, WARNING, e);
|
||||
interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
@IoExecutor
|
||||
private void writeEagerBatch(Collection<Message> batch,
|
||||
Map<MessageId, Integer> ids) throws IOException {
|
||||
if (interrupted) return;
|
||||
for (Message m : batch) recordWriter.writeMessage(m);
|
||||
LOG.info("Sent eager batch");
|
||||
if (ids.isEmpty()) decrementOutstandingQueries();
|
||||
else dbExecutor.execute(() -> generateEagerBatch(ids));
|
||||
}
|
||||
|
||||
@DatabaseExecutor
|
||||
private void generateAck() {
|
||||
if (interrupted) return;
|
||||
try {
|
||||
Ack a = db.transactionWithNullableResult(false, txn ->
|
||||
db.generateAck(txn, contactId, MAX_MESSAGE_IDS));
|
||||
if (LOG.isLoggable(INFO))
|
||||
LOG.info("Generated ack: " + (a != null));
|
||||
if (a == null) decrementOutstandingQueries();
|
||||
else writerTasks.add(() -> writeAck(a));
|
||||
} catch (DbException e) {
|
||||
logException(LOG, WARNING, e);
|
||||
interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
@IoExecutor
|
||||
private void writeAck(Ack ack) throws IOException {
|
||||
if (interrupted) return;
|
||||
recordWriter.writeAck(ack);
|
||||
private boolean generateAndSendAck() throws DbException, IOException {
|
||||
Ack a = db.transactionWithNullableResult(false, txn ->
|
||||
db.generateAck(txn, contactId, MAX_MESSAGE_IDS));
|
||||
if (LOG.isLoggable(INFO))
|
||||
LOG.info("Generated ack: " + (a != null));
|
||||
if (a == null) return false; // No more acks to send
|
||||
recordWriter.writeAck(a);
|
||||
LOG.info("Sent ack");
|
||||
dbExecutor.execute(this::generateAck);
|
||||
return true;
|
||||
}
|
||||
|
||||
@DatabaseExecutor
|
||||
private void generateBatch() {
|
||||
if (interrupted) return;
|
||||
try {
|
||||
Collection<Message> b =
|
||||
db.transactionWithNullableResult(false, txn ->
|
||||
db.generateBatch(txn, contactId,
|
||||
MAX_RECORD_PAYLOAD_BYTES, maxLatency));
|
||||
if (LOG.isLoggable(INFO))
|
||||
LOG.info("Generated batch: " + (b != null));
|
||||
if (b == null) decrementOutstandingQueries();
|
||||
else writerTasks.add(() -> writeBatch(b));
|
||||
} catch (DbException e) {
|
||||
logException(LOG, WARNING, e);
|
||||
interrupt();
|
||||
}
|
||||
void sendMessages() throws DbException, IOException {
|
||||
while (!isInterrupted()) if (!generateAndSendBatch()) break;
|
||||
}
|
||||
|
||||
@IoExecutor
|
||||
private void writeBatch(Collection<Message> batch) throws IOException {
|
||||
if (interrupted) return;
|
||||
for (Message m : batch) recordWriter.writeMessage(m);
|
||||
private boolean generateAndSendBatch() throws DbException, IOException {
|
||||
Collection<Message> b = db.transactionWithNullableResult(false, txn ->
|
||||
db.generateBatch(txn, contactId, BATCH_CAPACITY, maxLatency));
|
||||
if (LOG.isLoggable(INFO))
|
||||
LOG.info("Generated batch: " + (b != null));
|
||||
if (b == null) return false; // No more messages to send
|
||||
for (Message m : b) recordWriter.writeMessage(m);
|
||||
LOG.info("Sent batch");
|
||||
dbExecutor.execute(this::generateBatch);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -85,4 +85,9 @@ class SyncRecordWriterImpl implements SyncRecordWriter {
|
||||
public void flush() throws IOException {
|
||||
writer.flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getBytesWritten() {
|
||||
return writer.getBytesWritten();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -64,8 +64,13 @@ class SyncSessionFactoryImpl implements SyncSessionFactory {
|
||||
OutputStream out = streamWriter.getOutputStream();
|
||||
SyncRecordWriter recordWriter =
|
||||
recordWriterFactory.createRecordWriter(out);
|
||||
return new SimplexOutgoingSession(db, dbExecutor, eventBus, c, t,
|
||||
maxLatency, eager, streamWriter, recordWriter);
|
||||
if (eager) {
|
||||
return new EagerSimplexOutgoingSession(db, eventBus, c, t,
|
||||
maxLatency, streamWriter, recordWriter);
|
||||
} else {
|
||||
return new SimplexOutgoingSession(db, eventBus, c, t,
|
||||
maxLatency, streamWriter, recordWriter);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user