mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-11 18:29:05 +01:00
Merge branch '2291-mailbox-upload-plumbing' into 'master'
Plumbing for mailbox upload worker See merge request briar/briar!1670
This commit is contained in:
@@ -13,6 +13,7 @@ import org.briarproject.bramble.api.plugin.TransportConnectionWriter;
|
||||
import org.briarproject.bramble.api.plugin.TransportId;
|
||||
import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
|
||||
import org.briarproject.bramble.api.properties.TransportPropertyManager;
|
||||
import org.briarproject.bramble.api.sync.OutgoingSessionRecord;
|
||||
import org.briarproject.bramble.api.sync.SyncSessionFactory;
|
||||
import org.briarproject.bramble.api.transport.KeyManager;
|
||||
import org.briarproject.bramble.api.transport.StreamReaderFactory;
|
||||
@@ -100,7 +101,16 @@ class ConnectionManagerImpl implements ConnectionManager {
|
||||
TransportConnectionWriter w) {
|
||||
ioExecutor.execute(new OutgoingSimplexSyncConnection(keyManager,
|
||||
connectionRegistry, streamReaderFactory, streamWriterFactory,
|
||||
syncSessionFactory, transportPropertyManager, c, t, w));
|
||||
syncSessionFactory, transportPropertyManager, c, t, w, null));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void manageOutgoingConnection(ContactId c, TransportId t,
|
||||
TransportConnectionWriter w, OutgoingSessionRecord sessionRecord) {
|
||||
ioExecutor.execute(new OutgoingSimplexSyncConnection(keyManager,
|
||||
connectionRegistry, streamReaderFactory, streamWriterFactory,
|
||||
syncSessionFactory, transportPropertyManager, c, t, w,
|
||||
sessionRecord));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -6,6 +6,7 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.plugin.TransportConnectionWriter;
|
||||
import org.briarproject.bramble.api.plugin.TransportId;
|
||||
import org.briarproject.bramble.api.properties.TransportPropertyManager;
|
||||
import org.briarproject.bramble.api.sync.OutgoingSessionRecord;
|
||||
import org.briarproject.bramble.api.sync.SyncSession;
|
||||
import org.briarproject.bramble.api.sync.SyncSessionFactory;
|
||||
import org.briarproject.bramble.api.transport.KeyManager;
|
||||
@@ -16,6 +17,8 @@ import org.briarproject.bramble.api.transport.StreamWriterFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
import static java.util.logging.Level.WARNING;
|
||||
import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNonNull;
|
||||
import static org.briarproject.bramble.util.LogUtils.logException;
|
||||
@@ -26,6 +29,8 @@ class OutgoingSimplexSyncConnection extends SyncConnection implements Runnable {
|
||||
private final ContactId contactId;
|
||||
private final TransportId transportId;
|
||||
private final TransportConnectionWriter writer;
|
||||
@Nullable
|
||||
private final OutgoingSessionRecord sessionRecord;
|
||||
|
||||
OutgoingSimplexSyncConnection(KeyManager keyManager,
|
||||
ConnectionRegistry connectionRegistry,
|
||||
@@ -34,13 +39,15 @@ class OutgoingSimplexSyncConnection extends SyncConnection implements Runnable {
|
||||
SyncSessionFactory syncSessionFactory,
|
||||
TransportPropertyManager transportPropertyManager,
|
||||
ContactId contactId, TransportId transportId,
|
||||
TransportConnectionWriter writer) {
|
||||
TransportConnectionWriter writer,
|
||||
@Nullable OutgoingSessionRecord sessionRecord) {
|
||||
super(keyManager, connectionRegistry, streamReaderFactory,
|
||||
streamWriterFactory, syncSessionFactory,
|
||||
transportPropertyManager);
|
||||
this.contactId = contactId;
|
||||
this.transportId = transportId;
|
||||
this.writer = writer;
|
||||
this.sessionRecord = sessionRecord;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -71,10 +78,16 @@ class OutgoingSimplexSyncConnection extends SyncConnection implements Runnable {
|
||||
StreamWriter streamWriter = streamWriterFactory.createStreamWriter(
|
||||
w.getOutputStream(), ctx);
|
||||
ContactId c = requireNonNull(ctx.getContactId());
|
||||
// Use eager retransmission if the transport is lossy and cheap
|
||||
return syncSessionFactory.createSimplexOutgoingSession(c,
|
||||
ctx.getTransportId(), w.getMaxLatency(), w.isLossyAndCheap(),
|
||||
streamWriter);
|
||||
if (sessionRecord == null) {
|
||||
// Use eager retransmission if the transport is lossy and cheap
|
||||
return syncSessionFactory.createSimplexOutgoingSession(c,
|
||||
ctx.getTransportId(), w.getMaxLatency(),
|
||||
w.isLossyAndCheap(), streamWriter);
|
||||
} else {
|
||||
return syncSessionFactory.createSimplexOutgoingSession(c,
|
||||
ctx.getTransportId(), w.getMaxLatency(), streamWriter,
|
||||
sessionRecord);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -163,16 +163,11 @@ interface Database<T> {
|
||||
throws DbException;
|
||||
|
||||
/**
|
||||
* Returns true if there are any acks or messages to send to the given
|
||||
* contact over a transport with the given maximum latency.
|
||||
* Returns true if there are any acks to send to the given contact.
|
||||
* <p/>
|
||||
* Read-only.
|
||||
*
|
||||
* @param eager True if messages that are not yet due for retransmission
|
||||
* should be included
|
||||
*/
|
||||
boolean containsAnythingToSend(T txn, ContactId c, long maxLatency,
|
||||
boolean eager) throws DbException;
|
||||
boolean containsAcksToSend(T txn, ContactId c) throws DbException;
|
||||
|
||||
/**
|
||||
* Returns true if the database contains the given contact for the given
|
||||
@@ -212,6 +207,18 @@ interface Database<T> {
|
||||
*/
|
||||
boolean containsMessage(T txn, MessageId m) throws DbException;
|
||||
|
||||
/**
|
||||
* Returns true if there are any messages to send to the given
|
||||
* contact over a transport with the given maximum latency.
|
||||
* <p/>
|
||||
* Read-only.
|
||||
*
|
||||
* @param eager True if messages that are not yet due for retransmission
|
||||
* should be included
|
||||
*/
|
||||
boolean containsMessagesToSend(T txn, ContactId c, long maxLatency,
|
||||
boolean eager) throws DbException;
|
||||
|
||||
/**
|
||||
* Returns true if the database contains the given pending contact.
|
||||
* <p/>
|
||||
|
||||
@@ -342,12 +342,12 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean containsAnythingToSend(Transaction transaction, ContactId c,
|
||||
long maxLatency, boolean eager) throws DbException {
|
||||
public boolean containsAcksToSend(Transaction transaction, ContactId c)
|
||||
throws DbException {
|
||||
T txn = unbox(transaction);
|
||||
if (!db.containsContact(txn, c))
|
||||
throw new NoSuchContactException();
|
||||
return db.containsAnythingToSend(txn, c, maxLatency, eager);
|
||||
return db.containsAcksToSend(txn, c);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -373,6 +373,15 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
|
||||
return db.containsIdentity(txn, a);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean containsMessagesToSend(Transaction transaction, ContactId c,
|
||||
long maxLatency, boolean eager) throws DbException {
|
||||
T txn = unbox(transaction);
|
||||
if (!db.containsContact(txn, c))
|
||||
throw new NoSuchContactException();
|
||||
return db.containsMessagesToSend(txn, c, maxLatency, eager);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean containsPendingContact(Transaction transaction,
|
||||
PendingContactId p) throws DbException {
|
||||
@@ -1016,7 +1025,8 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
|
||||
db.getGroupVisibility(txn, id).keySet();
|
||||
db.removeGroup(txn, id);
|
||||
transaction.attach(new GroupRemovedEvent(g));
|
||||
transaction.attach(new GroupVisibilityUpdatedEvent(affected));
|
||||
transaction.attach(new GroupVisibilityUpdatedEvent(INVISIBLE,
|
||||
affected));
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -1141,7 +1151,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
|
||||
else if (v == INVISIBLE) db.removeGroupVisibility(txn, c, g);
|
||||
else db.setGroupVisibility(txn, c, g, v == SHARED);
|
||||
List<ContactId> affected = singletonList(c);
|
||||
transaction.attach(new GroupVisibilityUpdatedEvent(affected));
|
||||
transaction.attach(new GroupVisibilityUpdatedEvent(v, affected));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1147,8 +1147,8 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean containsAnythingToSend(Connection txn, ContactId c,
|
||||
long maxLatency, boolean eager) throws DbException {
|
||||
public boolean containsAcksToSend(Connection txn, ContactId c)
|
||||
throws DbException {
|
||||
PreparedStatement ps = null;
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
@@ -1160,34 +1160,7 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
boolean acksToSend = rs.next();
|
||||
rs.close();
|
||||
ps.close();
|
||||
if (acksToSend) return true;
|
||||
if (eager) {
|
||||
sql = "SELECT NULL from statuses"
|
||||
+ " WHERE contactId = ? AND state = ?"
|
||||
+ " AND groupShared = TRUE AND messageShared = TRUE"
|
||||
+ " AND deleted = FALSE AND seen = FALSE";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setInt(1, c.getInt());
|
||||
ps.setInt(2, DELIVERED.getValue());
|
||||
} else {
|
||||
long now = clock.currentTimeMillis();
|
||||
sql = "SELECT NULL FROM statuses"
|
||||
+ " WHERE contactId = ? AND state = ?"
|
||||
+ " AND groupShared = TRUE AND messageShared = TRUE"
|
||||
+ " AND deleted = FALSE AND seen = FALSE"
|
||||
+ " AND (expiry <= ? OR maxLatency IS NULL"
|
||||
+ " OR ? < maxLatency)";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setInt(1, c.getInt());
|
||||
ps.setInt(2, DELIVERED.getValue());
|
||||
ps.setLong(3, now);
|
||||
ps.setLong(4, maxLatency);
|
||||
}
|
||||
rs = ps.executeQuery();
|
||||
boolean messagesToSend = rs.next();
|
||||
rs.close();
|
||||
ps.close();
|
||||
return messagesToSend;
|
||||
return acksToSend;
|
||||
} catch (SQLException e) {
|
||||
tryToClose(rs, LOG, WARNING);
|
||||
tryToClose(ps, LOG, WARNING);
|
||||
@@ -1307,6 +1280,46 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean containsMessagesToSend(Connection txn, ContactId c,
|
||||
long maxLatency, boolean eager) throws DbException {
|
||||
PreparedStatement ps = null;
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
if (eager) {
|
||||
String sql = "SELECT NULL from statuses"
|
||||
+ " WHERE contactId = ? AND state = ?"
|
||||
+ " AND groupShared = TRUE AND messageShared = TRUE"
|
||||
+ " AND deleted = FALSE AND seen = FALSE";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setInt(1, c.getInt());
|
||||
ps.setInt(2, DELIVERED.getValue());
|
||||
} else {
|
||||
long now = clock.currentTimeMillis();
|
||||
String sql = "SELECT NULL FROM statuses"
|
||||
+ " WHERE contactId = ? AND state = ?"
|
||||
+ " AND groupShared = TRUE AND messageShared = TRUE"
|
||||
+ " AND deleted = FALSE AND seen = FALSE"
|
||||
+ " AND (expiry <= ? OR maxLatency IS NULL"
|
||||
+ " OR ? < maxLatency)";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setInt(1, c.getInt());
|
||||
ps.setInt(2, DELIVERED.getValue());
|
||||
ps.setLong(3, now);
|
||||
ps.setLong(4, maxLatency);
|
||||
}
|
||||
rs = ps.executeQuery();
|
||||
boolean messagesToSend = rs.next();
|
||||
rs.close();
|
||||
ps.close();
|
||||
return messagesToSend;
|
||||
} catch (SQLException e) {
|
||||
tryToClose(rs, LOG, WARNING);
|
||||
tryToClose(ps, LOG, WARNING);
|
||||
throw new DbException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean containsPendingContact(Connection txn, PendingContactId p)
|
||||
throws DbException {
|
||||
|
||||
@@ -9,14 +9,12 @@ import org.briarproject.bramble.api.plugin.simplex.SimplexPluginFactory;
|
||||
import javax.annotation.Nullable;
|
||||
import javax.inject.Inject;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.DAYS;
|
||||
import static org.briarproject.bramble.api.mailbox.MailboxConstants.ID;
|
||||
import static org.briarproject.bramble.api.mailbox.MailboxConstants.MAX_LATENCY;
|
||||
|
||||
@NotNullByDefault
|
||||
public class MailboxPluginFactory implements SimplexPluginFactory {
|
||||
|
||||
private static final long MAX_LATENCY = DAYS.toMillis(14);
|
||||
|
||||
@Inject
|
||||
MailboxPluginFactory() {
|
||||
}
|
||||
|
||||
@@ -106,7 +106,8 @@ class RemovableDriveManagerImpl
|
||||
@Override
|
||||
public boolean isWriterTaskNeeded(ContactId c) throws DbException {
|
||||
return db.transactionWithResult(true, txn ->
|
||||
db.containsAnythingToSend(txn, c, MAX_LATENCY, true));
|
||||
db.containsAcksToSend(txn, c) ||
|
||||
db.containsMessagesToSend(txn, c, MAX_LATENCY, true));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -50,6 +50,7 @@ import static java.util.logging.Level.WARNING;
|
||||
import static java.util.logging.Logger.getLogger;
|
||||
import static org.briarproject.bramble.api.lifecycle.LifecycleManager.LifecycleState.STOPPING;
|
||||
import static org.briarproject.bramble.api.record.Record.RECORD_HEADER_BYTES;
|
||||
import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_LENGTH;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.SUPPORTED_VERSIONS;
|
||||
@@ -235,8 +236,10 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
|
||||
generateOffer();
|
||||
} else if (e instanceof GroupVisibilityUpdatedEvent) {
|
||||
GroupVisibilityUpdatedEvent g = (GroupVisibilityUpdatedEvent) e;
|
||||
if (g.getAffectedContacts().contains(contactId))
|
||||
if (g.getVisibility() == SHARED &&
|
||||
g.getAffectedContacts().contains(contactId)) {
|
||||
generateOffer();
|
||||
}
|
||||
} else if (e instanceof MessageRequestedEvent) {
|
||||
if (((MessageRequestedEvent) e).getContactId().equals(contactId))
|
||||
generateBatch();
|
||||
|
||||
@@ -7,9 +7,9 @@ import org.briarproject.bramble.api.event.EventBus;
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.plugin.TransportId;
|
||||
import org.briarproject.bramble.api.sync.Ack;
|
||||
import org.briarproject.bramble.api.sync.DeferredSendHandler;
|
||||
import org.briarproject.bramble.api.sync.Message;
|
||||
import org.briarproject.bramble.api.sync.MessageId;
|
||||
import org.briarproject.bramble.api.sync.OutgoingSessionRecord;
|
||||
import org.briarproject.bramble.api.sync.SyncRecordWriter;
|
||||
import org.briarproject.bramble.api.transport.StreamWriter;
|
||||
|
||||
@@ -29,7 +29,7 @@ import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LEN
|
||||
|
||||
/**
|
||||
* A {@link SimplexOutgoingSession} for sending and acking messages via a
|
||||
* mailbox. The session uses a {@link DeferredSendHandler} to record the IDs
|
||||
* mailbox. The session uses a {@link OutgoingSessionRecord} to record the IDs
|
||||
* of the messages sent and acked during the session so that they can be
|
||||
* recorded in the DB as sent or acked after the file has been successfully
|
||||
* uploaded to the mailbox.
|
||||
@@ -41,7 +41,7 @@ class MailboxOutgoingSession extends SimplexOutgoingSession {
|
||||
private static final Logger LOG =
|
||||
getLogger(MailboxOutgoingSession.class.getName());
|
||||
|
||||
private final DeferredSendHandler deferredSendHandler;
|
||||
private final OutgoingSessionRecord sessionRecord;
|
||||
private final long initialCapacity;
|
||||
|
||||
MailboxOutgoingSession(DatabaseComponent db,
|
||||
@@ -51,11 +51,11 @@ class MailboxOutgoingSession extends SimplexOutgoingSession {
|
||||
long maxLatency,
|
||||
StreamWriter streamWriter,
|
||||
SyncRecordWriter recordWriter,
|
||||
DeferredSendHandler deferredSendHandler,
|
||||
OutgoingSessionRecord sessionRecord,
|
||||
long capacity) {
|
||||
super(db, eventBus, contactId, transportId, maxLatency, streamWriter,
|
||||
recordWriter);
|
||||
this.deferredSendHandler = deferredSendHandler;
|
||||
this.sessionRecord = sessionRecord;
|
||||
this.initialCapacity = capacity;
|
||||
}
|
||||
|
||||
@@ -65,7 +65,7 @@ class MailboxOutgoingSession extends SimplexOutgoingSession {
|
||||
Collection<MessageId> idsToAck = loadMessageIdsToAck();
|
||||
if (idsToAck.isEmpty()) break;
|
||||
recordWriter.writeAck(new Ack(idsToAck));
|
||||
deferredSendHandler.onAckSent(idsToAck);
|
||||
sessionRecord.onAckSent(idsToAck);
|
||||
LOG.info("Sent ack");
|
||||
}
|
||||
}
|
||||
@@ -96,7 +96,7 @@ class MailboxOutgoingSession extends SimplexOutgoingSession {
|
||||
db.getMessageToSend(txn, contactId, m, maxLatency, false));
|
||||
if (message == null) continue; // No longer shared
|
||||
recordWriter.writeMessage(message);
|
||||
deferredSendHandler.onMessageSent(m);
|
||||
sessionRecord.onMessageSent(m);
|
||||
LOG.info("Sent message");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import org.briarproject.bramble.api.db.DatabaseExecutor;
|
||||
import org.briarproject.bramble.api.event.EventBus;
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.plugin.TransportId;
|
||||
import org.briarproject.bramble.api.sync.OutgoingSessionRecord;
|
||||
import org.briarproject.bramble.api.sync.Priority;
|
||||
import org.briarproject.bramble.api.sync.PriorityHandler;
|
||||
import org.briarproject.bramble.api.sync.SyncRecordReader;
|
||||
@@ -25,6 +26,8 @@ import javax.annotation.Nullable;
|
||||
import javax.annotation.concurrent.Immutable;
|
||||
import javax.inject.Inject;
|
||||
|
||||
import static org.briarproject.bramble.api.mailbox.MailboxConstants.MAX_FILE_PAYLOAD_BYTES;
|
||||
|
||||
@Immutable
|
||||
@NotNullByDefault
|
||||
class SyncSessionFactoryImpl implements SyncSessionFactory {
|
||||
@@ -73,6 +76,18 @@ class SyncSessionFactoryImpl implements SyncSessionFactory {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public SyncSession createSimplexOutgoingSession(ContactId c, TransportId t,
|
||||
long maxLatency, StreamWriter streamWriter,
|
||||
OutgoingSessionRecord sessionRecord) {
|
||||
OutputStream out = streamWriter.getOutputStream();
|
||||
SyncRecordWriter recordWriter =
|
||||
recordWriterFactory.createRecordWriter(out);
|
||||
return new MailboxOutgoingSession(db, eventBus, c, t, maxLatency,
|
||||
streamWriter, recordWriter, sessionRecord,
|
||||
MAX_FILE_PAYLOAD_BYTES);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SyncSession createDuplexOutgoingSession(ContactId c, TransportId t,
|
||||
long maxLatency, int maxIdleTime, StreamWriter streamWriter,
|
||||
|
||||
@@ -303,11 +303,11 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
throws Exception {
|
||||
context.checking(new Expectations() {{
|
||||
// Check whether the contact is in the DB (which it's not)
|
||||
exactly(25).of(database).startTransaction();
|
||||
exactly(27).of(database).startTransaction();
|
||||
will(returnValue(txn));
|
||||
exactly(25).of(database).containsContact(txn, contactId);
|
||||
exactly(27).of(database).containsContact(txn, contactId);
|
||||
will(returnValue(false));
|
||||
exactly(25).of(database).abortTransaction(txn);
|
||||
exactly(27).of(database).abortTransaction(txn);
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
eventExecutor, shutdownManager);
|
||||
@@ -321,6 +321,23 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
// Expected
|
||||
}
|
||||
|
||||
try {
|
||||
db.transaction(true, transaction ->
|
||||
db.containsAcksToSend(transaction, contactId));
|
||||
fail();
|
||||
} catch (NoSuchContactException expected) {
|
||||
// Expected
|
||||
}
|
||||
|
||||
try {
|
||||
db.transaction(true, transaction ->
|
||||
db.containsMessagesToSend(transaction, contactId,
|
||||
123, true));
|
||||
fail();
|
||||
} catch (NoSuchContactException expected) {
|
||||
// Expected
|
||||
}
|
||||
|
||||
try {
|
||||
db.transaction(false, transaction ->
|
||||
db.generateAck(transaction, contactId, 123));
|
||||
|
||||
@@ -378,9 +378,9 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
|
||||
|
||||
// Initially there should be nothing to send
|
||||
assertFalse(
|
||||
db.containsAnythingToSend(txn, contactId, MAX_LATENCY, false));
|
||||
db.containsMessagesToSend(txn, contactId, MAX_LATENCY, false));
|
||||
assertFalse(
|
||||
db.containsAnythingToSend(txn, contactId, MAX_LATENCY, true));
|
||||
db.containsMessagesToSend(txn, contactId, MAX_LATENCY, true));
|
||||
|
||||
// Add some messages to ack
|
||||
Message message1 = getMessage(groupId);
|
||||
@@ -389,10 +389,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
|
||||
db.addMessage(txn, message1, DELIVERED, true, false, contactId);
|
||||
|
||||
// Both message IDs should be returned
|
||||
assertTrue(
|
||||
db.containsAnythingToSend(txn, contactId, MAX_LATENCY, false));
|
||||
assertTrue(
|
||||
db.containsAnythingToSend(txn, contactId, MAX_LATENCY, true));
|
||||
assertTrue(db.containsAcksToSend(txn, contactId));
|
||||
Collection<MessageId> ids = db.getMessagesToAck(txn, contactId, 1234);
|
||||
assertEquals(asList(messageId, messageId1), ids);
|
||||
|
||||
@@ -400,10 +397,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
|
||||
db.lowerAckFlag(txn, contactId, asList(messageId, messageId1));
|
||||
|
||||
// No message IDs should be returned
|
||||
assertFalse(
|
||||
db.containsAnythingToSend(txn, contactId, MAX_LATENCY, false));
|
||||
assertFalse(
|
||||
db.containsAnythingToSend(txn, contactId, MAX_LATENCY, true));
|
||||
assertFalse(db.containsAcksToSend(txn, contactId));
|
||||
assertEquals(emptyList(), db.getMessagesToAck(txn, contactId, 1234));
|
||||
|
||||
// Raise the ack flag again
|
||||
@@ -411,10 +405,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
|
||||
db.raiseAckFlag(txn, contactId, messageId1);
|
||||
|
||||
// Both message IDs should be returned
|
||||
assertTrue(
|
||||
db.containsAnythingToSend(txn, contactId, MAX_LATENCY, false));
|
||||
assertTrue(
|
||||
db.containsAnythingToSend(txn, contactId, MAX_LATENCY, true));
|
||||
assertTrue(db.containsAcksToSend(txn, contactId));
|
||||
ids = db.getMessagesToAck(txn, contactId, 1234);
|
||||
assertEquals(asList(messageId, messageId1), ids);
|
||||
|
||||
@@ -2579,7 +2570,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
|
||||
private void assertNothingToSendLazily(Database<Connection> db,
|
||||
Connection txn) throws Exception {
|
||||
assertFalse(
|
||||
db.containsAnythingToSend(txn, contactId, MAX_LATENCY, false));
|
||||
db.containsMessagesToSend(txn, contactId, MAX_LATENCY, false));
|
||||
Collection<MessageId> ids =
|
||||
db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY);
|
||||
assertTrue(ids.isEmpty());
|
||||
@@ -2590,7 +2581,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
|
||||
private void assertOneMessageToSendLazily(Database<Connection> db,
|
||||
Connection txn) throws Exception {
|
||||
assertTrue(
|
||||
db.containsAnythingToSend(txn, contactId, MAX_LATENCY, false));
|
||||
db.containsMessagesToSend(txn, contactId, MAX_LATENCY, false));
|
||||
Collection<MessageId> ids =
|
||||
db.getMessagesToSend(txn, contactId, ONE_MEGABYTE, MAX_LATENCY);
|
||||
assertEquals(singletonList(messageId), ids);
|
||||
@@ -2601,7 +2592,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
|
||||
private void assertNothingToSendEagerly(Database<Connection> db,
|
||||
Connection txn) throws Exception {
|
||||
assertFalse(
|
||||
db.containsAnythingToSend(txn, contactId, MAX_LATENCY, true));
|
||||
db.containsMessagesToSend(txn, contactId, MAX_LATENCY, true));
|
||||
Collection<MessageId> unacked =
|
||||
db.getUnackedMessagesToSend(txn, contactId);
|
||||
assertTrue(unacked.isEmpty());
|
||||
@@ -2611,7 +2602,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
|
||||
private void assertOneMessageToSendEagerly(Database<Connection> db,
|
||||
Connection txn) throws Exception {
|
||||
assertTrue(
|
||||
db.containsAnythingToSend(txn, contactId, MAX_LATENCY, true));
|
||||
db.containsMessagesToSend(txn, contactId, MAX_LATENCY, true));
|
||||
Collection<MessageId> unacked =
|
||||
db.getUnackedMessagesToSend(txn, contactId);
|
||||
assertEquals(singletonList(messageId), unacked);
|
||||
|
||||
@@ -6,10 +6,10 @@ import org.briarproject.bramble.api.db.Transaction;
|
||||
import org.briarproject.bramble.api.event.EventBus;
|
||||
import org.briarproject.bramble.api.plugin.TransportId;
|
||||
import org.briarproject.bramble.api.sync.Ack;
|
||||
import org.briarproject.bramble.api.sync.DeferredSendHandler;
|
||||
import org.briarproject.bramble.api.sync.GroupId;
|
||||
import org.briarproject.bramble.api.sync.Message;
|
||||
import org.briarproject.bramble.api.sync.MessageId;
|
||||
import org.briarproject.bramble.api.sync.OutgoingSessionRecord;
|
||||
import org.briarproject.bramble.api.sync.SyncRecordWriter;
|
||||
import org.briarproject.bramble.api.sync.Versions;
|
||||
import org.briarproject.bramble.api.transport.StreamWriter;
|
||||
@@ -30,6 +30,7 @@ import static org.briarproject.bramble.test.TestUtils.getContactId;
|
||||
import static org.briarproject.bramble.test.TestUtils.getMessage;
|
||||
import static org.briarproject.bramble.test.TestUtils.getRandomId;
|
||||
import static org.briarproject.bramble.test.TestUtils.getTransportId;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
||||
|
||||
@@ -40,8 +41,6 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
||||
private final StreamWriter streamWriter = context.mock(StreamWriter.class);
|
||||
private final SyncRecordWriter recordWriter =
|
||||
context.mock(SyncRecordWriter.class);
|
||||
private final DeferredSendHandler deferredSendHandler =
|
||||
context.mock(DeferredSendHandler.class);
|
||||
|
||||
private final ContactId contactId = getContactId();
|
||||
private final TransportId transportId = getTransportId();
|
||||
@@ -53,9 +52,10 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
||||
|
||||
@Test
|
||||
public void testNothingToSend() throws Exception {
|
||||
OutgoingSessionRecord sessionRecord = new OutgoingSessionRecord();
|
||||
MailboxOutgoingSession session = new MailboxOutgoingSession(db,
|
||||
eventBus, contactId, transportId, MAX_LATENCY,
|
||||
streamWriter, recordWriter, deferredSendHandler,
|
||||
streamWriter, recordWriter, sessionRecord,
|
||||
MAX_FILE_PAYLOAD_BYTES);
|
||||
|
||||
Transaction noAckIdTxn = new Transaction(null, true);
|
||||
@@ -92,13 +92,17 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
||||
}});
|
||||
|
||||
session.run();
|
||||
|
||||
assertEquals(emptyList(), sessionRecord.getAckedIds());
|
||||
assertEquals(emptyList(), sessionRecord.getSentIds());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSomethingToSend() throws Exception {
|
||||
OutgoingSessionRecord sessionRecord = new OutgoingSessionRecord();
|
||||
MailboxOutgoingSession session = new MailboxOutgoingSession(db,
|
||||
eventBus, contactId, transportId, MAX_LATENCY,
|
||||
streamWriter, recordWriter, deferredSendHandler,
|
||||
streamWriter, recordWriter, sessionRecord,
|
||||
MAX_FILE_PAYLOAD_BYTES);
|
||||
|
||||
Transaction ackIdTxn = new Transaction(null, true);
|
||||
@@ -127,8 +131,6 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
||||
oneOf(recordWriter).getBytesWritten();
|
||||
will(returnValue((long) versionRecordBytes));
|
||||
oneOf(recordWriter).writeAck(with(any(Ack.class)));
|
||||
oneOf(deferredSendHandler)
|
||||
.onAckSent(singletonList(message.getId()));
|
||||
// No more messages to ack
|
||||
oneOf(db).transactionWithResult(with(true),
|
||||
withDbCallable(noAckIdTxn));
|
||||
@@ -150,7 +152,6 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
||||
MAX_LATENCY, false);
|
||||
will(returnValue(message1));
|
||||
oneOf(recordWriter).writeMessage(message1);
|
||||
oneOf(deferredSendHandler).onMessageSent(message1.getId());
|
||||
// Send the end of stream marker
|
||||
oneOf(streamWriter).sendEndOfStream();
|
||||
// Remove listener
|
||||
@@ -158,6 +159,11 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
||||
}});
|
||||
|
||||
session.run();
|
||||
|
||||
assertEquals(singletonList(message.getId()),
|
||||
sessionRecord.getAckedIds());
|
||||
assertEquals(singletonList(message1.getId()),
|
||||
sessionRecord.getSentIds());
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -167,9 +173,10 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
||||
long capacity = RECORD_HEADER_BYTES + MessageId.LENGTH * MAX_MESSAGE_IDS
|
||||
+ RECORD_HEADER_BYTES + MessageId.LENGTH + MessageId.LENGTH - 1;
|
||||
|
||||
OutgoingSessionRecord sessionRecord = new OutgoingSessionRecord();
|
||||
MailboxOutgoingSession session = new MailboxOutgoingSession(db,
|
||||
eventBus, contactId, transportId, MAX_LATENCY,
|
||||
streamWriter, recordWriter, deferredSendHandler, capacity);
|
||||
streamWriter, recordWriter, sessionRecord, capacity);
|
||||
|
||||
Transaction ackIdTxn1 = new Transaction(null, true);
|
||||
Transaction ackIdTxn2 = new Transaction(null, true);
|
||||
@@ -184,6 +191,9 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
||||
}
|
||||
List<MessageId> idsInSecondAck =
|
||||
singletonList(new MessageId(getRandomId()));
|
||||
List<MessageId> allIds = new ArrayList<>(MAX_MESSAGE_IDS + 1);
|
||||
allIds.addAll(idsInFirstAck);
|
||||
allIds.addAll(idsInSecondAck);
|
||||
|
||||
context.checking(new DbExpectations() {{
|
||||
// Add listener
|
||||
@@ -200,7 +210,6 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
||||
will(returnValue(idsInFirstAck));
|
||||
// Send the first ack record
|
||||
oneOf(recordWriter).writeAck(with(any(Ack.class)));
|
||||
oneOf(deferredSendHandler).onAckSent(idsInFirstAck);
|
||||
// Calculate remaining capacity for acks
|
||||
oneOf(recordWriter).getBytesWritten();
|
||||
will(returnValue((long) versionRecordBytes + firstAckRecordBytes));
|
||||
@@ -211,7 +220,6 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
||||
will(returnValue(idsInSecondAck));
|
||||
// Send the second ack record
|
||||
oneOf(recordWriter).writeAck(with(any(Ack.class)));
|
||||
oneOf(deferredSendHandler).onAckSent(idsInSecondAck);
|
||||
// Not enough capacity left for another ack
|
||||
oneOf(recordWriter).getBytesWritten();
|
||||
will(returnValue((long) versionRecordBytes + firstAckRecordBytes
|
||||
@@ -227,5 +235,8 @@ public class MailboxOutgoingSessionTest extends BrambleMockTestCase {
|
||||
}});
|
||||
|
||||
session.run();
|
||||
|
||||
assertEquals(allIds, sessionRecord.getAckedIds());
|
||||
assertEquals(emptyList(), sessionRecord.getSentIds());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user