[core] delete conversation messages belonging to completed introduction sessions

A session is completed if it returned to the START state
and if all sent messages have been ACKed by the receiver.

The session's metadata is kept in case the user restarts the session
by doing another introduction.
This commit is contained in:
Torsten Grote
2019-10-08 12:46:19 -03:00
parent ea810c817b
commit 9660ff2fff
6 changed files with 523 additions and 54 deletions

View File

@@ -29,6 +29,11 @@ enum IntroduceeState implements State {
return value;
}
@Override
public boolean isComplete() {
return this == START;
}
static IntroduceeState fromValue(int value) throws FormatException {
for (IntroduceeState s : values()) if (s.value == value) return s;
throw new FormatException();

View File

@@ -29,6 +29,11 @@ enum IntroducerState implements State {
return value;
}
@Override
public boolean isComplete() {
return this == START;
}
static IntroducerState fromValue(int value) throws FormatException {
for (IntroducerState s : values()) if (s.value == value) return s;
throw new FormatException();

View File

@@ -42,6 +42,7 @@ import org.briarproject.briar.introduction.IntroducerSession.Introducee;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -319,7 +320,7 @@ class IntroductionManagerImpl extends ConversationClientImpl
if (ss == null) return true;
IntroducerSession session =
sessionParser.parseIntroducerSession(ss.bdfSession);
return session.getState() == START;
return session.getState().isComplete();
}
@Override
@@ -561,8 +562,98 @@ class IntroductionManagerImpl extends ConversationClientImpl
@Override
public boolean deleteAllMessages(Transaction txn, ContactId c)
throws DbException {
// TODO actually delete messages (#1627 and #1629)
return getMessageIds(txn, c).size() == 0;
// get ID of the contact group
GroupId g = getContactGroup(db.getContact(txn, c)).getId();
// get metadata for all messages in the group
Map<MessageId, BdfDictionary> messages;
try {
messages = clientHelper.getMessageMetadataAsDictionary(txn, g);
} catch (FormatException e) {
throw new DbException(e);
}
// assign protocol messages to their sessions
Map<SessionId, DeletableSession> sessions = new HashMap<>();
for (Entry<MessageId, BdfDictionary> entry : messages.entrySet()) {
MessageMetadata m;
try {
m = messageParser.parseMetadata(entry.getValue());
} catch (FormatException e) {
throw new DbException(e);
}
if (m.getSessionId() == null) {
// this can only be an unhandled REQUEST message
// that does not yet have a SessionId assigned
continue;
}
// get session from map or database
DeletableSession session = sessions.get(m.getSessionId());
if (session == null) {
session = getDeletableSession(txn, g, m.getSessionId());
sessions.put(m.getSessionId(), session);
}
session.messages.add(entry.getKey());
}
// get a set of all messages which were not ACKed by the contact
Set<MessageId> notAcked = new HashSet<>();
for (MessageStatus status : db.getMessageStatus(txn, c, g)) {
if (!status.isSeen()) notAcked.add(status.getMessageId());
}
return deleteCompletedSessions(txn, sessions, notAcked);
}
private DeletableSession getDeletableSession(Transaction txn,
GroupId introducerGroupId, SessionId sessionId) throws DbException {
try {
StoredSession ss = getSession(txn, sessionId);
if (ss == null) throw new AssertionError();
Session s;
Role role = sessionParser.getRole(ss.bdfSession);
if (role == INTRODUCER) {
s = sessionParser.parseIntroducerSession(ss.bdfSession);
} else if (role == INTRODUCEE) {
s = sessionParser.parseIntroduceeSession(introducerGroupId,
ss.bdfSession);
} else throw new AssertionError();
return new DeletableSession(s.getState());
} catch (FormatException e) {
throw new DbException(e);
}
}
private boolean deleteCompletedSessions(Transaction txn,
Map<SessionId, DeletableSession> sessions, Set<MessageId> notAcked)
throws DbException {
// find completed sessions to delete
boolean allDeleted = true;
for (DeletableSession session : sessions.values()) {
if (!session.state.isComplete()) {
allDeleted = false;
continue;
}
// we can only delete sessions
// where delivery of all messages was confirmed (aka ACKed)
boolean allAcked = true;
for (MessageId m : session.messages) {
if (notAcked.contains(m)) {
allAcked = false;
allDeleted = false;
break;
}
}
// delete messages of session, if all were ACKed
if (allAcked) {
for (MessageId m : session.messages) {
db.deleteMessage(txn, m);
db.deleteMessageMetadata(txn, m);
}
// we can not delete the session as it might get restarted
// and then needs the previous MessageIds
}
}
return allDeleted;
}
private Set<MessageId> getMessageIds(Transaction txn, ContactId c)
@@ -589,4 +680,14 @@ class IntroductionManagerImpl extends ConversationClientImpl
}
}
private static class DeletableSession {
private final State state;
private final List<MessageId> messages = new ArrayList<>();
private DeletableSession(State state) {
this.state = state;
}
}
}

View File

@@ -4,4 +4,6 @@ interface State {
int getValue();
boolean isComplete();
}

View File

@@ -142,14 +142,10 @@ public class IntroductionIntegrationTest
assertGroupCount(messageTracker0, g2.getId(), 1, 0);
// check that request message states are correct
Collection<ConversationMessageHeader> messages =
db0.transactionWithResult(true, txn -> introductionManager0
.getMessageHeaders(txn, contactId1From0));
Collection<ConversationMessageHeader> messages = getMessages1From0();
assertEquals(1, messages.size());
assertMessageState(messages.iterator().next(), true, false, false);
messages =
db0.transactionWithResult(true, txn -> introductionManager0
.getMessageHeaders(txn, contactId2From0));
messages = getMessages2From0();
assertEquals(1, messages.size());
assertMessageState(messages.iterator().next(), true, false, false);
@@ -162,9 +158,7 @@ public class IntroductionIntegrationTest
assertGroupCount(messageTracker1, g1.getId(), 2, 1);
// check that accept message state is correct
messages =
db1.transactionWithResult(true, txn -> introductionManager1
.getMessageHeaders(txn, contactId0From1));
messages = getMessages0From1();
assertEquals(2, messages.size());
for (ConversationMessageHeader h : messages) {
if (h instanceof ConversationResponse) {
@@ -327,22 +321,17 @@ public class IntroductionIntegrationTest
Group g1 = introductionManager0.getContactGroup(introducee1);
Group g2 = introductionManager0.getContactGroup(introducee2);
Collection<ConversationMessageHeader> messages =
db0.transactionWithResult(true, txn -> introductionManager0
.getMessageHeaders(txn, contactId1From0));
Collection<ConversationMessageHeader> messages = getMessages1From0();
assertEquals(2, messages.size());
assertGroupCount(messageTracker0, g1.getId(), 2, 1);
messages = db0.transactionWithResult(true, txn ->
introductionManager0.getMessageHeaders(txn, contactId2From0));
messages = getMessages2From0();
assertEquals(2, messages.size());
assertGroupCount(messageTracker0, g2.getId(), 2, 1);
messages = db1.transactionWithResult(true, txn ->
introductionManager1.getMessageHeaders(txn, contactId0From1));
messages = getMessages0From1();
assertEquals(2, messages.size());
assertGroupCount(messageTracker1, g1.getId(), 2, 1);
// introducee2 should also have the decline response of introducee1
messages = db2.transactionWithResult(true, txn ->
introductionManager2.getMessageHeaders(txn, contactId0From2));
messages = getMessages0From2();
assertEquals(3, messages.size());
assertGroupCount(messageTracker2, g2.getId(), 3, 2);
@@ -394,18 +383,13 @@ public class IntroductionIntegrationTest
assertFalse(contactManager2
.contactExists(author1.getId(), author2.getId()));
Collection<ConversationMessageHeader> messages =
db0.transactionWithResult(true, txn -> introductionManager0
.getMessageHeaders(txn, contactId1From0));
Collection<ConversationMessageHeader> messages = getMessages1From0();
assertEquals(2, messages.size());
messages = db0.transactionWithResult(true, txn ->
introductionManager0.getMessageHeaders(txn, contactId2From0));
messages = getMessages2From0();
assertEquals(2, messages.size());
messages = db1.transactionWithResult(true, txn ->
introductionManager1.getMessageHeaders(txn, contactId0From1));
messages = getMessages0From1();
assertEquals(3, messages.size());
messages = db2.transactionWithResult(true, txn ->
introductionManager2.getMessageHeaders(txn, contactId0From2));
messages = getMessages0From2();
assertEquals(2, messages.size());
assertFalse(listener0.aborted);
assertFalse(listener1.aborted);
@@ -597,21 +581,13 @@ public class IntroductionIntegrationTest
Group g1 = introductionManager0.getContactGroup(introducee1);
Group g2 = introductionManager0.getContactGroup(introducee2);
assertEquals(2, db0.transactionWithResult(true, txn ->
introductionManager0.getMessageHeaders(txn, contactId1From0))
.size());
assertEquals(2, getMessages1From0().size());
assertGroupCount(messageTracker0, g1.getId(), 2, 1);
assertEquals(2, db0.transactionWithResult(true, txn ->
introductionManager0.getMessageHeaders(txn, contactId2From0))
.size());
assertEquals(2, getMessages2From0().size());
assertGroupCount(messageTracker0, g2.getId(), 2, 1);
assertEquals(2, db1.transactionWithResult(true, txn ->
introductionManager1.getMessageHeaders(txn, contactId0From1))
.size());
assertEquals(2, getMessages0From1().size());
assertGroupCount(messageTracker1, g1.getId(), 2, 1);
assertEquals(3, db2.transactionWithResult(true, txn ->
introductionManager2.getMessageHeaders(txn, contactId0From2))
.size());
assertEquals(3, getMessages0From2().size());
assertGroupCount(messageTracker2, g2.getId(), 3, 2);
assertFalse(listener0.aborted);
@@ -635,9 +611,7 @@ public class IntroductionIntegrationTest
assertFalse(listener1.requestReceived);
// make really sure we don't have that request
assertTrue(db1.transactionWithResult(true, txn ->
introductionManager1.getMessageHeaders(txn, contactId0From1))
.isEmpty());
assertTrue(getMessages0From1().isEmpty());
// The message was invalid, so no abort message was sent
assertFalse(listener0.aborted);
@@ -982,6 +956,7 @@ public class IntroductionIntegrationTest
.getMessageMetadataAsDictionary(group0.getId()).size());
// ensure introducer has aborted the session
eventWaiter.await(TIMEOUT, 1); // wait for AbortEvent
assertTrue(listener0.aborted);
// sync REQUEST and ABORT message
@@ -1154,6 +1129,316 @@ public class IntroductionIntegrationTest
);
}
@Test
public void testDeletingAllMessagesWhenCompletingSession()
throws Exception {
addListeners(true, true);
// make introduction
long time = clock.currentTimeMillis();
introductionManager0
.makeIntroduction(contact1From0, contact2From0, "Hi!", time);
// sync first REQUEST message
sync0To1(1, true);
eventWaiter.await(TIMEOUT, 1);
// introducer can not yet remove messages
assertFalse(deleteAllMessages1From0());
// introducee1 can not yet remove messages
assertFalse(deleteAllMessages0From1());
// sync second REQUEST message
sync0To2(1, true);
eventWaiter.await(TIMEOUT, 1);
// introducer can not yet remove messages
assertFalse(deleteAllMessages2From0());
// introducee2 can not yet remove messages
assertFalse(deleteAllMessages0From2());
// sync first ACCEPT message
sync1To0(1, true);
eventWaiter.await(TIMEOUT, 1);
// introducer can not yet remove messages
assertFalse(deleteAllMessages1From0());
// sync second ACCEPT message
sync2To0(1, true);
eventWaiter.await(TIMEOUT, 1);
// introducer can not yet remove messages
assertFalse(deleteAllMessages2From0());
// sync forwarded ACCEPT messages to introducees
sync0To1(1, true);
sync0To2(1, true);
// introducee1 can not yet remove messages
assertFalse(deleteAllMessages0From1());
// introducee2 can not yet remove messages
assertFalse(deleteAllMessages0From2());
// sync first AUTH and its forward
sync1To0(1, true);
sync0To2(1, true);
// introducer can not yet remove messages
assertFalse(deleteAllMessages1From0());
assertFalse(deleteAllMessages2From0());
// introducee2 can not yet remove messages
assertFalse(deleteAllMessages0From2());
// sync second AUTH and its forward as well as the following ACTIVATE
sync2To0(2, true);
sync0To1(2, true);
// introducer can not yet remove messages
assertFalse(deleteAllMessages1From0());
assertFalse(deleteAllMessages2From0());
// introducee1 can not yet remove messages
assertFalse(deleteAllMessages0From1());
// sync second ACTIVATE and its forward
sync1To0(1, true);
sync0To2(1, true);
// wait for introduction to succeed
eventWaiter.await(TIMEOUT, 2);
assertTrue(listener1.succeeded);
assertTrue(listener2.succeeded);
// introducer can now remove messages
assertTrue(deleteAllMessages1From0());
assertEquals(0, getMessages1From0().size());
assertTrue(deleteAllMessages1From0()); // a second time returns true
// introducee1 can not yet remove messages, because last not ACKed
assertFalse(deleteAllMessages0From1());
assertEquals(2, getMessages0From1().size());
// ACK last message
sendAcks(c0, c1, contactId1From0, 1);
// introducee1 can now remove messages
assertTrue(deleteAllMessages0From1());
assertEquals(0, getMessages0From1().size());
assertTrue(deleteAllMessages0From1()); // a second time returns true
// introducee2 can remove messages (last message was incoming)
assertTrue(deleteAllMessages0From2());
assertEquals(0, getMessages0From2().size());
assertTrue(deleteAllMessages0From2()); // a second time returns true
// a new introduction is still possible
assertTrue(introductionManager0
.canIntroduce(contact1From0, contact2From0));
introductionManager0
.makeIntroduction(contact1From0, contact2From0, "Ho!", time);
sync0To1(1, true);
sync0To2(1, true);
// sync responses
sync1To0(1, true);
eventWaiter.await(TIMEOUT, 1);
sync2To0(1, true);
eventWaiter.await(TIMEOUT, 1);
// no one should have aborted until now
assertFalse(listener0.aborted);
assertFalse(listener1.aborted);
assertFalse(listener2.aborted);
// nobody can delete anything again
assertFalse(deleteAllMessages1From0());
assertFalse(deleteAllMessages2From0());
assertFalse(deleteAllMessages0From1());
assertFalse(deleteAllMessages0From2());
}
@Test
public void testDeletingAllMessagesWhenDeclining() throws Exception {
addListeners(false, false);
// make introduction
long time = clock.currentTimeMillis();
introductionManager0
.makeIntroduction(contact1From0, contact2From0, "Hi!", time);
// sync REQUEST messages
sync0To1(1, true);
eventWaiter.await(TIMEOUT, 1);
sync0To2(1, true);
eventWaiter.await(TIMEOUT, 1);
// sync first DECLINE message
sync1To0(1, true);
eventWaiter.await(TIMEOUT, 1);
// introducer can not yet remove messages
assertFalse(deleteAllMessages1From0());
// introducee1 can not yet remove messages
assertFalse(deleteAllMessages0From1());
// sync second DECLINE message
sync2To0(1, true);
eventWaiter.await(TIMEOUT, 1);
// introducer can not yet remove messages
assertFalse(deleteAllMessages2From0());
// introducee2 can not yet remove messages
assertFalse(deleteAllMessages0From2());
// forward first DECLINE message
sync0To2(1, true);
// introducee2 can now remove messages
assertTrue(deleteAllMessages0From2());
assertEquals(0, getMessages0From2().size());
assertTrue(deleteAllMessages0From2()); // a second time nothing happens
// forward second DECLINE message
sync0To1(1, true);
// introducee1 can now remove messages
assertTrue(deleteAllMessages0From1());
assertEquals(0, getMessages0From1().size());
assertTrue(deleteAllMessages0From1()); // a second time nothing happens
// introducer can not yet remove messages
assertFalse(deleteAllMessages1From0());
assertFalse(deleteAllMessages2From0());
// introducer can remove messages after getting ACK from introducee1
sendAcks(c1, c0, contactId0From1, 1);
assertTrue(deleteAllMessages1From0());
assertEquals(0, getMessages1From0().size());
assertTrue(deleteAllMessages1From0()); // a second time nothing happens
// introducer can remove messages after getting ACK from introducee2
sendAcks(c2, c0, contactId0From2, 1);
assertTrue(deleteAllMessages2From0());
assertEquals(0, getMessages2From0().size());
assertTrue(deleteAllMessages2From0()); // a second time nothing happens
// a new introduction is still possible
assertTrue(introductionManager0
.canIntroduce(contact1From0, contact2From0));
time = clock.currentTimeMillis();
introductionManager0
.makeIntroduction(contact1From0, contact2From0, "Ho!", time);
sync0To1(1, true);
sync0To2(1, true);
// sync responses
sync1To0(1, true);
eventWaiter.await(TIMEOUT, 1);
sync2To0(1, true);
eventWaiter.await(TIMEOUT, 1);
// no one should have aborted until now
assertFalse(listener0.aborted);
assertFalse(listener1.aborted);
assertFalse(listener2.aborted);
// nobody can delete anything again
assertFalse(deleteAllMessages1From0());
assertFalse(deleteAllMessages2From0());
assertFalse(deleteAllMessages0From1());
assertFalse(deleteAllMessages0From2());
}
/**
* This test is testing that a session's deletable flag gets reset
* when the session is used again,
* so that it can not cause a session to get deleted prematurely.
*/
@Test
public void testDeletingOneSideOfSession() throws Exception {
addListeners(false, false);
// make introduction
long time = clock.currentTimeMillis();
introductionManager0
.makeIntroduction(contact1From0, contact2From0, "Hi!", time);
// sync REQUEST messages
sync0To1(1, true);
eventWaiter.await(TIMEOUT, 1);
sync0To2(1, true);
eventWaiter.await(TIMEOUT, 1);
// sync DECLINE messages
sync1To0(1, true);
eventWaiter.await(TIMEOUT, 1);
sync2To0(1, true);
eventWaiter.await(TIMEOUT, 1);
// forward DECLINE messages
sync0To2(1, true);
sync0To1(1, true);
// introducer can remove messages after getting ACK from introducee1
sendAcks(c1, c0, contactId0From1, 1);
assertTrue(deleteAllMessages1From0());
assertEquals(0, getMessages1From0().size());
assertTrue(deleteAllMessages1From0()); // a second time nothing happens
// a new introduction is still possible
assertTrue(introductionManager0
.canIntroduce(contact1From0, contact2From0));
time = clock.currentTimeMillis();
introductionManager0
.makeIntroduction(contact1From0, contact2From0, "Ho!", time);
sync0To1(1, true);
sync0To2(1, true);
// sync and forward DECLINE messages
sync1To0(1, true);
sync2To0(1, true);
sync0To2(1, true);
sync0To1(1, true);
// introducer can remove messages after getting ACK from introducee1
sendAcks(c1, c0, contactId0From1, 1);
assertTrue(deleteAllMessages1From0());
assertEquals(0, getMessages1From0().size());
assertTrue(deleteAllMessages1From0()); // a second time nothing happens
// introducer can remove messages after getting ACK from introducee2
// if this succeeds, we still had the session object after delete above
sendAcks(c2, c0, contactId0From2, 1);
assertTrue(deleteAllMessages2From0());
assertEquals(0, getMessages2From0().size());
assertTrue(deleteAllMessages2From0()); // a second time nothing happens
// no one should have aborted
assertFalse(listener0.aborted);
assertFalse(listener1.aborted);
assertFalse(listener2.aborted);
}
private boolean deleteAllMessages1From0() throws DbException {
return db0.transactionWithResult(false, txn -> introductionManager0
.deleteAllMessages(txn, contactId1From0));
}
private boolean deleteAllMessages2From0() throws DbException {
return db0.transactionWithResult(false, txn -> introductionManager0
.deleteAllMessages(txn, contactId2From0));
}
private boolean deleteAllMessages0From1() throws DbException {
return db1.transactionWithResult(false, txn -> introductionManager1
.deleteAllMessages(txn, contactId0From1));
}
private boolean deleteAllMessages0From2() throws DbException {
return db2.transactionWithResult(false, txn -> introductionManager2
.deleteAllMessages(txn, contactId0From2));
}
private void addTransportProperties() throws Exception {
TransportPropertyManager tpm0 = c0.getTransportPropertyManager();
TransportPropertyManager tpm1 = c1.getTransportPropertyManager();
@@ -1174,28 +1459,47 @@ public class IntroductionIntegrationTest
}
private void assertDefaultUiMessages() throws DbException {
Collection<ConversationMessageHeader> messages =
db0.transactionWithResult(true, txn -> introductionManager0
.getMessageHeaders(txn, contactId1From0));
Collection<ConversationMessageHeader> messages = getMessages1From0();
assertEquals(2, messages.size());
assertMessagesAreAcked(messages);
messages = db0.transactionWithResult(true, txn ->
introductionManager0.getMessageHeaders(txn, contactId2From0));
messages = getMessages2From0();
assertEquals(2, messages.size());
assertMessagesAreAcked(messages);
messages = db1.transactionWithResult(true, txn ->
introductionManager1.getMessageHeaders(txn, contactId0From1));
messages = getMessages0From1();
assertEquals(2, messages.size());
assertMessagesAreAcked(messages);
messages = db2.transactionWithResult(true, txn ->
introductionManager2.getMessageHeaders(txn, contactId0From2));
messages = getMessages0From2();
assertEquals(2, messages.size());
assertMessagesAreAcked(messages);
}
private Collection<ConversationMessageHeader> getMessages1From0()
throws DbException {
return db0.transactionWithResult(true, txn -> introductionManager0
.getMessageHeaders(txn, contactId1From0));
}
private Collection<ConversationMessageHeader> getMessages2From0()
throws DbException {
return db0.transactionWithResult(true, txn -> introductionManager0
.getMessageHeaders(txn, contactId2From0));
}
private Collection<ConversationMessageHeader> getMessages0From1()
throws DbException {
return db1.transactionWithResult(true, txn -> introductionManager1
.getMessageHeaders(txn, contactId0From1));
}
private Collection<ConversationMessageHeader> getMessages0From2()
throws DbException {
return db2.transactionWithResult(true, txn -> introductionManager2
.getMessageHeaders(txn, contactId0From2));
}
private void assertMessagesAreAcked(
Collection<ConversationMessageHeader> messages) {
for (ConversationMessageHeader msg : messages) {

View File

@@ -25,6 +25,7 @@ import org.briarproject.bramble.api.nullsafety.ParametersNotNullByDefault;
import org.briarproject.bramble.api.sync.MessageFactory;
import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.api.sync.event.MessageStateChangedEvent;
import org.briarproject.bramble.api.sync.event.MessagesAckedEvent;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.test.TestTransportConnectionReader;
import org.briarproject.bramble.test.TestTransportConnectionWriter;
@@ -123,11 +124,14 @@ public abstract class BriarIntegrationTest<C extends BriarIntegrationTestCompone
// objects accessed from background threads need to be volatile
private volatile Waiter validationWaiter;
private volatile Waiter deliveryWaiter;
private volatile Waiter ackWaiter;
private volatile boolean expectAck = false;
protected C c0, c1, c2;
private final Semaphore messageSemaphore = new Semaphore(0);
private final AtomicInteger messageCounter = new AtomicInteger(0);
private final AtomicInteger ackCounter = new AtomicInteger(0);
private final File testDir = TestUtils.getTestDirectory();
private final String AUTHOR0 = "Author 0";
private final String AUTHOR1 = "Author 1";
@@ -158,6 +162,7 @@ public abstract class BriarIntegrationTest<C extends BriarIntegrationTestCompone
// initialize waiters fresh for each test
validationWaiter = new Waiter();
deliveryWaiter = new Waiter();
ackWaiter = new Waiter();
createAndRegisterIdentities();
startLifecycles();
@@ -219,6 +224,13 @@ public abstract class BriarIntegrationTest<C extends BriarIntegrationTestCompone
validationWaiter.resume();
}
}
} else if (e instanceof MessagesAckedEvent && expectAck) {
MessagesAckedEvent event = (MessagesAckedEvent) e;
ackCounter.addAndGet(event.getMessageIds().size());
for (MessageId m : event.getMessageIds()) {
loadAndLogMessage(m);
ackWaiter.resume();
}
}
}
@@ -381,6 +393,46 @@ public abstract class BriarIntegrationTest<C extends BriarIntegrationTestCompone
}
}
protected void sendAcks(BriarIntegrationTestComponent fromComponent,
BriarIntegrationTestComponent toComponent, ContactId toId, int num)
throws Exception {
// Debug output
String from =
fromComponent.getIdentityManager().getLocalAuthor().getName();
String to = toComponent.getIdentityManager().getLocalAuthor().getName();
LOG.info("TEST: Sending " + num + " ACKs from " + from + " to " + to);
expectAck = true;
// start outgoing connection
ByteArrayOutputStream out = new ByteArrayOutputStream();
TestTransportConnectionWriter writer =
new TestTransportConnectionWriter(out);
fromComponent.getConnectionManager().manageOutgoingConnection(toId,
SIMPLEX_TRANSPORT_ID, writer);
writer.getDisposedLatch().await(TIMEOUT, MILLISECONDS);
// handle incoming connection
ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
TestTransportConnectionReader reader =
new TestTransportConnectionReader(in);
toComponent.getConnectionManager().manageIncomingConnection(
SIMPLEX_TRANSPORT_ID, reader);
ackWaiter.await(TIMEOUT, num);
assertEquals("ACKs delivered", num, ackCounter.getAndSet(0));
assertEquals("No messages delivered", 0, messageCounter.get());
try {
messageSemaphore.tryAcquire(num, TIMEOUT, MILLISECONDS);
} catch (InterruptedException e) {
LOG.info("Interrupted while waiting for messages");
Thread.currentThread().interrupt();
fail();
} finally {
expectAck = false;
}
}
protected void removeAllContacts() throws DbException {
contactManager0.removeContact(contactId1From0);
contactManager0.removeContact(contactId2From0);