Merge branch '1627-delete-completed-introduction-sessions' into 'master'

Delete conversation messages belonging to completed introduction sessions

See merge request briar/briar!1163
This commit is contained in:
akwizgran
2019-10-09 12:39:05 +00:00
6 changed files with 520 additions and 54 deletions

View File

@@ -29,6 +29,11 @@ enum IntroduceeState implements State {
return value;
}
@Override
public boolean isComplete() {
return this == START;
}
static IntroduceeState fromValue(int value) throws FormatException {
for (IntroduceeState s : values()) if (s.value == value) return s;
throw new FormatException();

View File

@@ -29,6 +29,11 @@ enum IntroducerState implements State {
return value;
}
@Override
public boolean isComplete() {
return this == START;
}
static IntroducerState fromValue(int value) throws FormatException {
for (IntroducerState s : values()) if (s.value == value) return s;
throw new FormatException();

View File

@@ -42,6 +42,7 @@ import org.briarproject.briar.introduction.IntroducerSession.Introducee;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -319,7 +320,7 @@ class IntroductionManagerImpl extends ConversationClientImpl
if (ss == null) return true;
IntroducerSession session =
sessionParser.parseIntroducerSession(ss.bdfSession);
return session.getState() == START;
return session.getState().isComplete();
}
@Override
@@ -561,8 +562,100 @@ class IntroductionManagerImpl extends ConversationClientImpl
@Override
public boolean deleteAllMessages(Transaction txn, ContactId c)
throws DbException {
// TODO actually delete messages (#1627 and #1629)
return getMessageIds(txn, c).size() == 0;
// get ID of the contact group
GroupId g = getContactGroup(db.getContact(txn, c)).getId();
// get metadata for all messages in the group
Map<MessageId, BdfDictionary> messages;
try {
messages = clientHelper.getMessageMetadataAsDictionary(txn, g);
} catch (FormatException e) {
throw new DbException(e);
}
// assign protocol messages to their sessions
Map<SessionId, DeletableSession> sessions = new HashMap<>();
for (Entry<MessageId, BdfDictionary> entry : messages.entrySet()) {
MessageMetadata m;
try {
m = messageParser.parseMetadata(entry.getValue());
} catch (FormatException e) {
throw new DbException(e);
}
if (m.getSessionId() == null) {
// This can only be an unhandled REQUEST message.
// Its session is created and stored in incomingMessage(),
// and getMessageMetadata() only returns delivered messages,
// so the session ID should have been assigned.
throw new AssertionError("missing session ID");
}
// get session from map or database
DeletableSession session = sessions.get(m.getSessionId());
if (session == null) {
session = getDeletableSession(txn, g, m.getSessionId());
sessions.put(m.getSessionId(), session);
}
session.messages.add(entry.getKey());
}
// get a set of all messages which were not ACKed by the contact
Set<MessageId> notAcked = new HashSet<>();
for (MessageStatus status : db.getMessageStatus(txn, c, g)) {
if (!status.isSeen()) notAcked.add(status.getMessageId());
}
return deleteCompletedSessions(txn, sessions, notAcked);
}
private DeletableSession getDeletableSession(Transaction txn,
GroupId introducerGroupId, SessionId sessionId) throws DbException {
try {
StoredSession ss = getSession(txn, sessionId);
if (ss == null) throw new AssertionError();
Session s;
Role role = sessionParser.getRole(ss.bdfSession);
if (role == INTRODUCER) {
s = sessionParser.parseIntroducerSession(ss.bdfSession);
} else if (role == INTRODUCEE) {
s = sessionParser.parseIntroduceeSession(introducerGroupId,
ss.bdfSession);
} else throw new AssertionError();
return new DeletableSession(s.getState());
} catch (FormatException e) {
throw new DbException(e);
}
}
private boolean deleteCompletedSessions(Transaction txn,
Map<SessionId, DeletableSession> sessions, Set<MessageId> notAcked)
throws DbException {
// find completed sessions to delete
boolean allDeleted = true;
for (DeletableSession session : sessions.values()) {
if (!session.state.isComplete()) {
allDeleted = false;
continue;
}
// we can only delete sessions
// where delivery of all messages was confirmed (aka ACKed)
boolean allAcked = true;
for (MessageId m : session.messages) {
if (notAcked.contains(m)) {
allAcked = false;
allDeleted = false;
break;
}
}
// delete messages of session, if all were ACKed
if (allAcked) {
for (MessageId m : session.messages) {
db.deleteMessage(txn, m);
db.deleteMessageMetadata(txn, m);
}
// we can not delete the session as it might get restarted
// and then needs the previous MessageIds
}
}
return allDeleted;
}
private Set<MessageId> getMessageIds(Transaction txn, ContactId c)
@@ -589,4 +682,14 @@ class IntroductionManagerImpl extends ConversationClientImpl
}
}
private static class DeletableSession {
private final State state;
private final List<MessageId> messages = new ArrayList<>();
private DeletableSession(State state) {
this.state = state;
}
}
}

View File

@@ -4,4 +4,6 @@ interface State {
int getValue();
boolean isComplete();
}

View File

@@ -142,14 +142,10 @@ public class IntroductionIntegrationTest
assertGroupCount(messageTracker0, g2.getId(), 1, 0);
// check that request message states are correct
Collection<ConversationMessageHeader> messages =
db0.transactionWithResult(true, txn -> introductionManager0
.getMessageHeaders(txn, contactId1From0));
Collection<ConversationMessageHeader> messages = getMessages1From0();
assertEquals(1, messages.size());
assertMessageState(messages.iterator().next(), true, false, false);
messages =
db0.transactionWithResult(true, txn -> introductionManager0
.getMessageHeaders(txn, contactId2From0));
messages = getMessages2From0();
assertEquals(1, messages.size());
assertMessageState(messages.iterator().next(), true, false, false);
@@ -162,9 +158,7 @@ public class IntroductionIntegrationTest
assertGroupCount(messageTracker1, g1.getId(), 2, 1);
// check that accept message state is correct
messages =
db1.transactionWithResult(true, txn -> introductionManager1
.getMessageHeaders(txn, contactId0From1));
messages = getMessages0From1();
assertEquals(2, messages.size());
for (ConversationMessageHeader h : messages) {
if (h instanceof ConversationResponse) {
@@ -327,22 +321,17 @@ public class IntroductionIntegrationTest
Group g1 = introductionManager0.getContactGroup(introducee1);
Group g2 = introductionManager0.getContactGroup(introducee2);
Collection<ConversationMessageHeader> messages =
db0.transactionWithResult(true, txn -> introductionManager0
.getMessageHeaders(txn, contactId1From0));
Collection<ConversationMessageHeader> messages = getMessages1From0();
assertEquals(2, messages.size());
assertGroupCount(messageTracker0, g1.getId(), 2, 1);
messages = db0.transactionWithResult(true, txn ->
introductionManager0.getMessageHeaders(txn, contactId2From0));
messages = getMessages2From0();
assertEquals(2, messages.size());
assertGroupCount(messageTracker0, g2.getId(), 2, 1);
messages = db1.transactionWithResult(true, txn ->
introductionManager1.getMessageHeaders(txn, contactId0From1));
messages = getMessages0From1();
assertEquals(2, messages.size());
assertGroupCount(messageTracker1, g1.getId(), 2, 1);
// introducee2 should also have the decline response of introducee1
messages = db2.transactionWithResult(true, txn ->
introductionManager2.getMessageHeaders(txn, contactId0From2));
messages = getMessages0From2();
assertEquals(3, messages.size());
assertGroupCount(messageTracker2, g2.getId(), 3, 2);
@@ -394,18 +383,13 @@ public class IntroductionIntegrationTest
assertFalse(contactManager2
.contactExists(author1.getId(), author2.getId()));
Collection<ConversationMessageHeader> messages =
db0.transactionWithResult(true, txn -> introductionManager0
.getMessageHeaders(txn, contactId1From0));
Collection<ConversationMessageHeader> messages = getMessages1From0();
assertEquals(2, messages.size());
messages = db0.transactionWithResult(true, txn ->
introductionManager0.getMessageHeaders(txn, contactId2From0));
messages = getMessages2From0();
assertEquals(2, messages.size());
messages = db1.transactionWithResult(true, txn ->
introductionManager1.getMessageHeaders(txn, contactId0From1));
messages = getMessages0From1();
assertEquals(3, messages.size());
messages = db2.transactionWithResult(true, txn ->
introductionManager2.getMessageHeaders(txn, contactId0From2));
messages = getMessages0From2();
assertEquals(2, messages.size());
assertFalse(listener0.aborted);
assertFalse(listener1.aborted);
@@ -597,21 +581,13 @@ public class IntroductionIntegrationTest
Group g1 = introductionManager0.getContactGroup(introducee1);
Group g2 = introductionManager0.getContactGroup(introducee2);
assertEquals(2, db0.transactionWithResult(true, txn ->
introductionManager0.getMessageHeaders(txn, contactId1From0))
.size());
assertEquals(2, getMessages1From0().size());
assertGroupCount(messageTracker0, g1.getId(), 2, 1);
assertEquals(2, db0.transactionWithResult(true, txn ->
introductionManager0.getMessageHeaders(txn, contactId2From0))
.size());
assertEquals(2, getMessages2From0().size());
assertGroupCount(messageTracker0, g2.getId(), 2, 1);
assertEquals(2, db1.transactionWithResult(true, txn ->
introductionManager1.getMessageHeaders(txn, contactId0From1))
.size());
assertEquals(2, getMessages0From1().size());
assertGroupCount(messageTracker1, g1.getId(), 2, 1);
assertEquals(3, db2.transactionWithResult(true, txn ->
introductionManager2.getMessageHeaders(txn, contactId0From2))
.size());
assertEquals(3, getMessages0From2().size());
assertGroupCount(messageTracker2, g2.getId(), 3, 2);
assertFalse(listener0.aborted);
@@ -635,9 +611,7 @@ public class IntroductionIntegrationTest
assertFalse(listener1.requestReceived);
// make really sure we don't have that request
assertTrue(db1.transactionWithResult(true, txn ->
introductionManager1.getMessageHeaders(txn, contactId0From1))
.isEmpty());
assertTrue(getMessages0From1().isEmpty());
// The message was invalid, so no abort message was sent
assertFalse(listener0.aborted);
@@ -982,6 +956,7 @@ public class IntroductionIntegrationTest
.getMessageMetadataAsDictionary(group0.getId()).size());
// ensure introducer has aborted the session
eventWaiter.await(TIMEOUT, 1); // wait for AbortEvent
assertTrue(listener0.aborted);
// sync REQUEST and ABORT message
@@ -1154,6 +1129,311 @@ public class IntroductionIntegrationTest
);
}
@Test
public void testDeletingAllMessagesWhenCompletingSession()
throws Exception {
addListeners(true, true);
// make introduction
long time = clock.currentTimeMillis();
introductionManager0
.makeIntroduction(contact1From0, contact2From0, "Hi!", time);
// sync first REQUEST message
sync0To1(1, true);
eventWaiter.await(TIMEOUT, 1);
// introducer can not yet remove messages
assertFalse(deleteAllMessages1From0());
// introducee1 can not yet remove messages
assertFalse(deleteAllMessages0From1());
// sync second REQUEST message
sync0To2(1, true);
eventWaiter.await(TIMEOUT, 1);
// introducer can not yet remove messages
assertFalse(deleteAllMessages2From0());
// introducee2 can not yet remove messages
assertFalse(deleteAllMessages0From2());
// sync first ACCEPT message
sync1To0(1, true);
eventWaiter.await(TIMEOUT, 1);
// introducer can not yet remove messages
assertFalse(deleteAllMessages1From0());
// sync second ACCEPT message
sync2To0(1, true);
eventWaiter.await(TIMEOUT, 1);
// introducer can not yet remove messages
assertFalse(deleteAllMessages2From0());
// sync forwarded ACCEPT messages to introducees
sync0To1(1, true);
sync0To2(1, true);
// introducee1 can not yet remove messages
assertFalse(deleteAllMessages0From1());
// introducee2 can not yet remove messages
assertFalse(deleteAllMessages0From2());
// sync first AUTH and its forward
sync1To0(1, true);
sync0To2(1, true);
// introducer can not yet remove messages
assertFalse(deleteAllMessages1From0());
assertFalse(deleteAllMessages2From0());
// introducee2 can not yet remove messages
assertFalse(deleteAllMessages0From2());
// sync second AUTH and its forward as well as the following ACTIVATE
sync2To0(2, true);
sync0To1(2, true);
// introducer can not yet remove messages
assertFalse(deleteAllMessages1From0());
assertFalse(deleteAllMessages2From0());
// introducee1 can not yet remove messages
assertFalse(deleteAllMessages0From1());
// sync second ACTIVATE and its forward
sync1To0(1, true);
sync0To2(1, true);
// wait for introduction to succeed
eventWaiter.await(TIMEOUT, 2);
assertTrue(listener1.succeeded);
assertTrue(listener2.succeeded);
// introducer can now remove messages
assertTrue(deleteAllMessages1From0());
assertEquals(0, getMessages1From0().size());
assertTrue(deleteAllMessages1From0()); // a second time returns true
// introducee1 can not yet remove messages, because last not ACKed
assertFalse(deleteAllMessages0From1());
assertEquals(2, getMessages0From1().size());
// ACK last message
sendAcks(c0, c1, contactId1From0, 1);
// introducee1 can now remove messages
assertTrue(deleteAllMessages0From1());
assertEquals(0, getMessages0From1().size());
assertTrue(deleteAllMessages0From1()); // a second time returns true
// introducee2 can remove messages (last message was incoming)
assertTrue(deleteAllMessages0From2());
assertEquals(0, getMessages0From2().size());
assertTrue(deleteAllMessages0From2()); // a second time returns true
// a new introduction is still possible
assertTrue(introductionManager0
.canIntroduce(contact1From0, contact2From0));
introductionManager0
.makeIntroduction(contact1From0, contact2From0, "Ho!", time);
sync0To1(1, true);
sync0To2(1, true);
// sync responses
sync1To0(1, true);
eventWaiter.await(TIMEOUT, 1);
sync2To0(1, true);
eventWaiter.await(TIMEOUT, 1);
// no one should have aborted until now
assertFalse(listener0.aborted);
assertFalse(listener1.aborted);
assertFalse(listener2.aborted);
// nobody can delete anything again
assertFalse(deleteAllMessages1From0());
assertFalse(deleteAllMessages2From0());
assertFalse(deleteAllMessages0From1());
assertFalse(deleteAllMessages0From2());
}
@Test
public void testDeletingAllMessagesWhenDeclining() throws Exception {
addListeners(false, false);
// make introduction
long time = clock.currentTimeMillis();
introductionManager0
.makeIntroduction(contact1From0, contact2From0, "Hi!", time);
// sync REQUEST messages
sync0To1(1, true);
eventWaiter.await(TIMEOUT, 1);
sync0To2(1, true);
eventWaiter.await(TIMEOUT, 1);
// sync first DECLINE message
sync1To0(1, true);
eventWaiter.await(TIMEOUT, 1);
// introducer can not yet remove messages
assertFalse(deleteAllMessages1From0());
// introducee1 can not yet remove messages
assertFalse(deleteAllMessages0From1());
// sync second DECLINE message
sync2To0(1, true);
eventWaiter.await(TIMEOUT, 1);
// introducer can not yet remove messages
assertFalse(deleteAllMessages2From0());
// introducee2 can not yet remove messages
assertFalse(deleteAllMessages0From2());
// forward first DECLINE message
sync0To2(1, true);
// introducee2 can now remove messages
assertTrue(deleteAllMessages0From2());
assertEquals(0, getMessages0From2().size());
assertTrue(deleteAllMessages0From2()); // a second time nothing happens
// forward second DECLINE message
sync0To1(1, true);
// introducee1 can now remove messages
assertTrue(deleteAllMessages0From1());
assertEquals(0, getMessages0From1().size());
assertTrue(deleteAllMessages0From1()); // a second time nothing happens
// introducer can not yet remove messages
assertFalse(deleteAllMessages1From0());
assertFalse(deleteAllMessages2From0());
// introducer can remove messages after getting ACK from introducee1
sendAcks(c1, c0, contactId0From1, 1);
assertTrue(deleteAllMessages1From0());
assertEquals(0, getMessages1From0().size());
assertTrue(deleteAllMessages1From0()); // a second time nothing happens
// introducer can remove messages after getting ACK from introducee2
sendAcks(c2, c0, contactId0From2, 1);
assertTrue(deleteAllMessages2From0());
assertEquals(0, getMessages2From0().size());
assertTrue(deleteAllMessages2From0()); // a second time nothing happens
// a new introduction is still possible
assertTrue(introductionManager0
.canIntroduce(contact1From0, contact2From0));
time = clock.currentTimeMillis();
introductionManager0
.makeIntroduction(contact1From0, contact2From0, "Ho!", time);
sync0To1(1, true);
sync0To2(1, true);
// sync responses
sync1To0(1, true);
eventWaiter.await(TIMEOUT, 1);
sync2To0(1, true);
eventWaiter.await(TIMEOUT, 1);
// no one should have aborted until now
assertFalse(listener0.aborted);
assertFalse(listener1.aborted);
assertFalse(listener2.aborted);
// nobody can delete anything again
assertFalse(deleteAllMessages1From0());
assertFalse(deleteAllMessages2From0());
assertFalse(deleteAllMessages0From1());
assertFalse(deleteAllMessages0From2());
}
@Test
public void testDeletingOneSideOfSession() throws Exception {
addListeners(false, false);
// make introduction
long time = clock.currentTimeMillis();
introductionManager0
.makeIntroduction(contact1From0, contact2From0, "Hi!", time);
// sync REQUEST messages
sync0To1(1, true);
eventWaiter.await(TIMEOUT, 1);
sync0To2(1, true);
eventWaiter.await(TIMEOUT, 1);
// sync DECLINE messages
sync1To0(1, true);
eventWaiter.await(TIMEOUT, 1);
sync2To0(1, true);
eventWaiter.await(TIMEOUT, 1);
// forward DECLINE messages
sync0To2(1, true);
sync0To1(1, true);
// introducer can remove messages after getting ACK from introducee1
sendAcks(c1, c0, contactId0From1, 1);
assertTrue(deleteAllMessages1From0());
assertEquals(0, getMessages1From0().size());
assertTrue(deleteAllMessages1From0()); // a second time nothing happens
// a new introduction is still possible
assertTrue(introductionManager0
.canIntroduce(contact1From0, contact2From0));
time = clock.currentTimeMillis();
introductionManager0
.makeIntroduction(contact1From0, contact2From0, "Ho!", time);
sync0To1(1, true);
sync0To2(1, true);
// sync and forward DECLINE messages
sync1To0(1, true);
sync2To0(1, true);
sync0To2(1, true);
sync0To1(1, true);
// introducer can remove messages after getting ACK from introducee1
sendAcks(c1, c0, contactId0From1, 1);
assertTrue(deleteAllMessages1From0());
assertEquals(0, getMessages1From0().size());
assertTrue(deleteAllMessages1From0()); // a second time nothing happens
// introducer can remove messages after getting ACK from introducee2
// if this succeeds, we still had the session object after delete above
sendAcks(c2, c0, contactId0From2, 1);
assertTrue(deleteAllMessages2From0());
assertEquals(0, getMessages2From0().size());
assertTrue(deleteAllMessages2From0()); // a second time nothing happens
// no one should have aborted
assertFalse(listener0.aborted);
assertFalse(listener1.aborted);
assertFalse(listener2.aborted);
}
private boolean deleteAllMessages1From0() throws DbException {
return db0.transactionWithResult(false, txn -> introductionManager0
.deleteAllMessages(txn, contactId1From0));
}
private boolean deleteAllMessages2From0() throws DbException {
return db0.transactionWithResult(false, txn -> introductionManager0
.deleteAllMessages(txn, contactId2From0));
}
private boolean deleteAllMessages0From1() throws DbException {
return db1.transactionWithResult(false, txn -> introductionManager1
.deleteAllMessages(txn, contactId0From1));
}
private boolean deleteAllMessages0From2() throws DbException {
return db2.transactionWithResult(false, txn -> introductionManager2
.deleteAllMessages(txn, contactId0From2));
}
private void addTransportProperties() throws Exception {
TransportPropertyManager tpm0 = c0.getTransportPropertyManager();
TransportPropertyManager tpm1 = c1.getTransportPropertyManager();
@@ -1174,28 +1454,47 @@ public class IntroductionIntegrationTest
}
private void assertDefaultUiMessages() throws DbException {
Collection<ConversationMessageHeader> messages =
db0.transactionWithResult(true, txn -> introductionManager0
.getMessageHeaders(txn, contactId1From0));
Collection<ConversationMessageHeader> messages = getMessages1From0();
assertEquals(2, messages.size());
assertMessagesAreAcked(messages);
messages = db0.transactionWithResult(true, txn ->
introductionManager0.getMessageHeaders(txn, contactId2From0));
messages = getMessages2From0();
assertEquals(2, messages.size());
assertMessagesAreAcked(messages);
messages = db1.transactionWithResult(true, txn ->
introductionManager1.getMessageHeaders(txn, contactId0From1));
messages = getMessages0From1();
assertEquals(2, messages.size());
assertMessagesAreAcked(messages);
messages = db2.transactionWithResult(true, txn ->
introductionManager2.getMessageHeaders(txn, contactId0From2));
messages = getMessages0From2();
assertEquals(2, messages.size());
assertMessagesAreAcked(messages);
}
private Collection<ConversationMessageHeader> getMessages1From0()
throws DbException {
return db0.transactionWithResult(true, txn -> introductionManager0
.getMessageHeaders(txn, contactId1From0));
}
private Collection<ConversationMessageHeader> getMessages2From0()
throws DbException {
return db0.transactionWithResult(true, txn -> introductionManager0
.getMessageHeaders(txn, contactId2From0));
}
private Collection<ConversationMessageHeader> getMessages0From1()
throws DbException {
return db1.transactionWithResult(true, txn -> introductionManager1
.getMessageHeaders(txn, contactId0From1));
}
private Collection<ConversationMessageHeader> getMessages0From2()
throws DbException {
return db2.transactionWithResult(true, txn -> introductionManager2
.getMessageHeaders(txn, contactId0From2));
}
private void assertMessagesAreAcked(
Collection<ConversationMessageHeader> messages) {
for (ConversationMessageHeader msg : messages) {

View File

@@ -25,6 +25,7 @@ import org.briarproject.bramble.api.nullsafety.ParametersNotNullByDefault;
import org.briarproject.bramble.api.sync.MessageFactory;
import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.api.sync.event.MessageStateChangedEvent;
import org.briarproject.bramble.api.sync.event.MessagesAckedEvent;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.test.TestTransportConnectionReader;
import org.briarproject.bramble.test.TestTransportConnectionWriter;
@@ -123,11 +124,14 @@ public abstract class BriarIntegrationTest<C extends BriarIntegrationTestCompone
// objects accessed from background threads need to be volatile
private volatile Waiter validationWaiter;
private volatile Waiter deliveryWaiter;
private volatile Waiter ackWaiter;
private volatile boolean expectAck = false;
protected C c0, c1, c2;
private final Semaphore messageSemaphore = new Semaphore(0);
private final AtomicInteger messageCounter = new AtomicInteger(0);
private final AtomicInteger ackCounter = new AtomicInteger(0);
private final File testDir = TestUtils.getTestDirectory();
private final String AUTHOR0 = "Author 0";
private final String AUTHOR1 = "Author 1";
@@ -158,6 +162,7 @@ public abstract class BriarIntegrationTest<C extends BriarIntegrationTestCompone
// initialize waiters fresh for each test
validationWaiter = new Waiter();
deliveryWaiter = new Waiter();
ackWaiter = new Waiter();
createAndRegisterIdentities();
startLifecycles();
@@ -219,6 +224,13 @@ public abstract class BriarIntegrationTest<C extends BriarIntegrationTestCompone
validationWaiter.resume();
}
}
} else if (e instanceof MessagesAckedEvent && expectAck) {
MessagesAckedEvent event = (MessagesAckedEvent) e;
ackCounter.addAndGet(event.getMessageIds().size());
for (MessageId m : event.getMessageIds()) {
loadAndLogMessage(m);
ackWaiter.resume();
}
}
}
@@ -381,6 +393,46 @@ public abstract class BriarIntegrationTest<C extends BriarIntegrationTestCompone
}
}
protected void sendAcks(BriarIntegrationTestComponent fromComponent,
BriarIntegrationTestComponent toComponent, ContactId toId, int num)
throws Exception {
// Debug output
String from =
fromComponent.getIdentityManager().getLocalAuthor().getName();
String to = toComponent.getIdentityManager().getLocalAuthor().getName();
LOG.info("TEST: Sending " + num + " ACKs from " + from + " to " + to);
expectAck = true;
// start outgoing connection
ByteArrayOutputStream out = new ByteArrayOutputStream();
TestTransportConnectionWriter writer =
new TestTransportConnectionWriter(out);
fromComponent.getConnectionManager().manageOutgoingConnection(toId,
SIMPLEX_TRANSPORT_ID, writer);
writer.getDisposedLatch().await(TIMEOUT, MILLISECONDS);
// handle incoming connection
ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
TestTransportConnectionReader reader =
new TestTransportConnectionReader(in);
toComponent.getConnectionManager().manageIncomingConnection(
SIMPLEX_TRANSPORT_ID, reader);
ackWaiter.await(TIMEOUT, num);
assertEquals("ACKs delivered", num, ackCounter.getAndSet(0));
assertEquals("No messages delivered", 0, messageCounter.get());
try {
messageSemaphore.tryAcquire(num, TIMEOUT, MILLISECONDS);
} catch (InterruptedException e) {
LOG.info("Interrupted while waiting for messages");
Thread.currentThread().interrupt();
fail();
} finally {
expectAck = false;
}
}
protected void removeAllContacts() throws DbException {
contactManager0.removeContact(contactId1From0);
contactManager0.removeContact(contactId2From0);