Count sent messages in integration tests.

This commit is contained in:
akwizgran
2021-03-03 12:51:44 +00:00
parent af55d181e9
commit 9bc3a2c73d
6 changed files with 123 additions and 75 deletions

View File

@@ -4,9 +4,6 @@ import org.briarproject.bramble.api.contact.Contact;
import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.db.DatabaseComponent; import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.DbException; import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.event.Event;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.event.EventListener;
import org.briarproject.bramble.api.sync.GroupId; import org.briarproject.bramble.api.sync.GroupId;
import org.briarproject.bramble.api.sync.MessageId; import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.system.TimeTravelModule; import org.briarproject.bramble.system.TimeTravelModule;
@@ -24,12 +21,8 @@ import org.junit.Before;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch;
import javax.annotation.Nonnull;
import static java.util.Collections.sort; import static java.util.Collections.sort;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.briarproject.bramble.api.cleanup.CleanupManager.BATCH_DELAY_MS; import static org.briarproject.bramble.api.cleanup.CleanupManager.BATCH_DELAY_MS;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@@ -151,30 +144,4 @@ public abstract class AbstractAutoDeleteTest extends
assertEquals("messageCount", messageCount, gc.getMsgCount()); assertEquals("messageCount", messageCount, gc.getMsgCount());
assertEquals("unreadCount", unreadCount, gc.getUnreadCount()); assertEquals("unreadCount", unreadCount, gc.getUnreadCount());
} }
/**
* Broadcasts a marker event and waits for it to be delivered, which
* indicates that all previously broadcast events have been delivered.
*/
protected void waitForEvents(BriarIntegrationTestComponent component)
throws Exception {
CountDownLatch latch = new CountDownLatch(1);
MarkerEvent marker = new MarkerEvent();
EventBus eventBus = component.getEventBus();
eventBus.addListener(new EventListener() {
@Override
public void eventOccurred(@Nonnull Event e) {
if (e == marker) {
latch.countDown();
eventBus.removeListener(this);
}
}
});
eventBus.broadcast(marker);
if (!latch.await(1, MINUTES)) fail();
}
private static class MarkerEvent extends Event {
}
} }

View File

@@ -166,25 +166,27 @@ public class ForumManagerTest
@Test @Test
public void testForumPostDeliveredAfterParent() throws Exception { public void testForumPostDeliveredAfterParent() throws Exception {
// add one forum post without the parent // Add a parent post and a child post
long time = c0.getClock().currentTimeMillis(); long time = c0.getClock().currentTimeMillis();
ForumPost post1 = createForumPost(groupId0, null, "a", time); ForumPost post1 = createForumPost(groupId0, null, "a", time);
ForumPost post2 = createForumPost(groupId0, post1, "a", time); ForumPost post2 = createForumPost(groupId0, post1, "a", time);
forumManager0.addLocalPost(post2);
assertEquals(1, forumManager0.getPostHeaders(groupId0).size());
assertEquals(0, forumManager1.getPostHeaders(groupId0).size());
// send post to 1 without waiting for message delivery
sync0To1(1, false);
assertEquals(0, forumManager1.getPostHeaders(groupId0).size());
// now add the parent post as well
forumManager0.addLocalPost(post1); forumManager0.addLocalPost(post1);
forumManager0.addLocalPost(post2);
assertEquals(2, forumManager0.getPostHeaders(groupId0).size()); assertEquals(2, forumManager0.getPostHeaders(groupId0).size());
assertEquals(0, forumManager1.getPostHeaders(groupId0).size()); assertEquals(0, forumManager1.getPostHeaders(groupId0).size());
// and send it over to 1 and wait for a second message to be delivered // Unshare the parent so it won't be sent yet
sync0To1(2, true); setMessageNotShared(c0, post1.getMessage().getId());
// Send the child post to 1 - it should be validated but not delivered
// yet, as the parent is missing, and no headers should be returned yet
syncMessage(c0, c1, contactId1From0, 1, 0, 1, 0);
assertEquals(0, forumManager1.getPostHeaders(groupId0).size());
// Now send the parent post to 1 - it should be validated, both
// posts should be delivered, and both headers should be returned
setMessageShared(c0, post1.getMessage().getId());
syncMessage(c0, c1, contactId1From0, 1, 0, 0, 2);
assertEquals(2, forumManager1.getPostHeaders(groupId0).size()); assertEquals(2, forumManager1.getPostHeaders(groupId0).size());
} }

View File

@@ -729,20 +729,6 @@ public class AutoDeleteIntegrationTest extends AbstractAutoDeleteTest {
component.getClock().currentTimeMillis(), "image/jpeg", in); component.getClock().currentTimeMillis(), "image/jpeg", in);
} }
private void setMessageNotShared(BriarIntegrationTestComponent component,
MessageId messageId) throws Exception {
DatabaseComponent db = component.getDatabaseComponent();
db.transaction(false, txn -> db.setMessageNotShared(txn, messageId));
}
private void setMessageShared(BriarIntegrationTestComponent component,
MessageId messageId) throws Exception {
DatabaseComponent db = component.getDatabaseComponent();
db.transaction(false, txn -> db.setMessageShared(txn, messageId));
}
private boolean messageIsDeleted(BriarIntegrationTestComponent component, private boolean messageIsDeleted(BriarIntegrationTestComponent component,
MessageId messageId) throws DbException { MessageId messageId) throws DbException {
DatabaseComponent db = component.getDatabaseComponent(); DatabaseComponent db = component.getDatabaseComponent();
@@ -754,5 +740,4 @@ public class AutoDeleteIntegrationTest extends AbstractAutoDeleteTest {
return true; return true;
} }
} }
} }

View File

@@ -175,8 +175,10 @@ public class PrivateGroupIntegrationTest
// 1 reveals the contact relationship to 2 // 1 reveals the contact relationship to 2
assertNotNull(contactId2From1); assertNotNull(contactId2From1);
groupInvitationManager1.revealRelationship(contactId2From1, groupId0); groupInvitationManager1.revealRelationship(contactId2From1, groupId0);
sync1To2(1, true); sync1To2(1, true); // 1 sends an invitation protocol join message
sync2To1(1, true); // 2 sends an invitation protocol join message and three private group
// protocol join messages, which 1 has already seen
syncMessage(c2, c1, contactId1From2, 1, 3, 0, 1);
// their relationship is now revealed // their relationship is now revealed
assertEquals(REVEALED_BY_US, assertEquals(REVEALED_BY_US,

View File

@@ -783,8 +783,8 @@ public class ForumSharingIntegrationTest
// invitee accepts // invitee accepts
respondToRequest(contactId0From1, true); respondToRequest(contactId0From1, true);
// sync response back // sync response and both posts back (0 has already seen the posts)
sync1To0(1, true); syncMessage(c1, c0, contactId0From1, 1, 2, 0, 1);
eventWaiter.await(TIMEOUT, 1); eventWaiter.await(TIMEOUT, 1);
assertResponseReceived(listener0, contactId1From0, true); assertResponseReceived(listener0, contactId1From0, true);

View File

@@ -15,6 +15,7 @@ import org.briarproject.bramble.api.data.BdfStringUtils;
import org.briarproject.bramble.api.db.DatabaseComponent; import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.DbException; import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.event.Event; import org.briarproject.bramble.api.event.Event;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.event.EventListener; import org.briarproject.bramble.api.event.EventListener;
import org.briarproject.bramble.api.identity.Identity; import org.briarproject.bramble.api.identity.Identity;
import org.briarproject.bramble.api.identity.IdentityManager; import org.briarproject.bramble.api.identity.IdentityManager;
@@ -26,6 +27,7 @@ import org.briarproject.bramble.api.sync.MessageFactory;
import org.briarproject.bramble.api.sync.MessageId; import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.api.sync.event.MessageStateChangedEvent; import org.briarproject.bramble.api.sync.event.MessageStateChangedEvent;
import org.briarproject.bramble.api.sync.event.MessagesAckedEvent; import org.briarproject.bramble.api.sync.event.MessagesAckedEvent;
import org.briarproject.bramble.api.sync.event.MessagesSentEvent;
import org.briarproject.bramble.test.TestTransportConnectionReader; import org.briarproject.bramble.test.TestTransportConnectionReader;
import org.briarproject.bramble.test.TestTransportConnectionWriter; import org.briarproject.bramble.test.TestTransportConnectionWriter;
import org.briarproject.bramble.test.TestUtils; import org.briarproject.bramble.test.TestUtils;
@@ -44,16 +46,21 @@ import org.junit.Before;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.File; import java.io.File;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.inject.Inject; import javax.inject.Inject;
import static java.util.concurrent.Executors.newSingleThreadExecutor; import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.logging.Level.WARNING; import static java.util.logging.Level.WARNING;
import static java.util.logging.Logger.getLogger; import static java.util.logging.Logger.getLogger;
import static junit.framework.Assert.assertNotNull; import static junit.framework.Assert.assertNotNull;
@@ -398,14 +405,28 @@ public abstract class BriarIntegrationTest<C extends BriarIntegrationTestCompone
protected void syncMessage(BriarIntegrationTestComponent fromComponent, protected void syncMessage(BriarIntegrationTestComponent fromComponent,
BriarIntegrationTestComponent toComponent, ContactId toId, int num, BriarIntegrationTestComponent toComponent, ContactId toId, int num,
boolean valid) throws Exception { boolean valid) throws Exception {
syncMessage(fromComponent, toComponent, toId, num, 0, valid ? 0 : num,
valid ? num : 0);
}
protected void syncMessage(BriarIntegrationTestComponent fromComponent,
BriarIntegrationTestComponent toComponent, ContactId toId,
int numNew, int numDupes, int numPendingOrInvalid, int numDelivered)
throws Exception {
// Debug output // Debug output
String from = String from =
fromComponent.getIdentityManager().getLocalAuthor().getName(); fromComponent.getIdentityManager().getLocalAuthor().getName();
String to = toComponent.getIdentityManager().getLocalAuthor().getName(); String to = toComponent.getIdentityManager().getLocalAuthor().getName();
LOG.info("TEST: Sending " + num + " message(s) from " + from + " to " + LOG.info("TEST: Sending " + (numNew + numDupes) + " message(s) from "
to); + from + " to " + to);
// Listen for messages being sent
waitForEvents(fromComponent);
SendListener sendListener = new SendListener();
fromComponent.getEventBus().addListener(sendListener);
// Write the messages to a transport stream
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
TestTransportConnectionWriter writer = TestTransportConnectionWriter writer =
new TestTransportConnectionWriter(out); new TestTransportConnectionWriter(out);
@@ -413,23 +434,33 @@ public abstract class BriarIntegrationTest<C extends BriarIntegrationTestCompone
SIMPLEX_TRANSPORT_ID, writer); SIMPLEX_TRANSPORT_ID, writer);
writer.getDisposedLatch().await(TIMEOUT, MILLISECONDS); writer.getDisposedLatch().await(TIMEOUT, MILLISECONDS);
// Check that the expected number of messages were sent
waitForEvents(fromComponent);
fromComponent.getEventBus().removeListener(sendListener);
assertEquals("Messages sent", numNew + numDupes,
sendListener.sent.size());
// Read the messages from the transport stream
ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray()); ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
TestTransportConnectionReader reader = TestTransportConnectionReader reader =
new TestTransportConnectionReader(in); new TestTransportConnectionReader(in);
toComponent.getConnectionManager().manageIncomingConnection( toComponent.getConnectionManager().manageIncomingConnection(
SIMPLEX_TRANSPORT_ID, reader); SIMPLEX_TRANSPORT_ID, reader);
if (valid) { if (numPendingOrInvalid > 0) {
deliveryWaiter.await(TIMEOUT, num); validationWaiter.await(TIMEOUT, numPendingOrInvalid);
assertEquals("Messages delivered", num,
deliveryCounter.getAndSet(0));
} else {
validationWaiter.await(TIMEOUT, num);
assertEquals("Messages validated", num,
validationCounter.getAndSet(0));
} }
assertEquals("Messages validated", numPendingOrInvalid,
validationCounter.getAndSet(0));
if (numDelivered > 0) {
deliveryWaiter.await(TIMEOUT, numDelivered);
}
assertEquals("Messages delivered", numDelivered,
deliveryCounter.getAndSet(0));
try { try {
messageSemaphore.tryAcquire(num, TIMEOUT, MILLISECONDS); messageSemaphore.tryAcquire(numNew, TIMEOUT, MILLISECONDS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.info("Interrupted while waiting for messages"); LOG.info("Interrupted while waiting for messages");
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@@ -448,6 +479,11 @@ public abstract class BriarIntegrationTest<C extends BriarIntegrationTestCompone
expectAck = true; expectAck = true;
// Listen for messages being sent (none should be sent)
waitForEvents(fromComponent);
SendListener sendListener = new SendListener();
fromComponent.getEventBus().addListener(sendListener);
// start outgoing connection // start outgoing connection
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
TestTransportConnectionWriter writer = TestTransportConnectionWriter writer =
@@ -456,6 +492,11 @@ public abstract class BriarIntegrationTest<C extends BriarIntegrationTestCompone
SIMPLEX_TRANSPORT_ID, writer); SIMPLEX_TRANSPORT_ID, writer);
writer.getDisposedLatch().await(TIMEOUT, MILLISECONDS); writer.getDisposedLatch().await(TIMEOUT, MILLISECONDS);
// Check that no messages were sent
waitForEvents(fromComponent);
fromComponent.getEventBus().removeListener(sendListener);
assertEquals("Messages sent", 0, sendListener.sent.size());
// handle incoming connection // handle incoming connection
ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray()); ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
TestTransportConnectionReader reader = TestTransportConnectionReader reader =
@@ -506,4 +547,55 @@ public abstract class BriarIntegrationTest<C extends BriarIntegrationTestCompone
txn -> autoDeleteManager.getAutoDeleteTimer(txn, contactId, txn -> autoDeleteManager.getAutoDeleteTimer(txn, contactId,
timestamp)); timestamp));
} }
protected void setMessageNotShared(BriarIntegrationTestComponent component,
MessageId messageId) throws Exception {
DatabaseComponent db = component.getDatabaseComponent();
db.transaction(false, txn -> db.setMessageNotShared(txn, messageId));
}
protected void setMessageShared(BriarIntegrationTestComponent component,
MessageId messageId) throws Exception {
DatabaseComponent db = component.getDatabaseComponent();
db.transaction(false, txn -> db.setMessageShared(txn, messageId));
}
/**
* Broadcasts a marker event and waits for it to be delivered, which
* indicates that all previously broadcast events have been delivered.
*/
protected void waitForEvents(BriarIntegrationTestComponent component)
throws Exception {
CountDownLatch latch = new CountDownLatch(1);
MarkerEvent marker = new MarkerEvent();
EventBus eventBus = component.getEventBus();
eventBus.addListener(new EventListener() {
@Override
public void eventOccurred(@Nonnull Event e) {
if (e == marker) {
latch.countDown();
eventBus.removeListener(this);
}
}
});
eventBus.broadcast(marker);
if (!latch.await(1, MINUTES)) fail();
}
private static class MarkerEvent extends Event {
}
private static class SendListener implements EventListener {
private final Set<MessageId> sent = new HashSet<>();
@Override
public void eventOccurred(Event e) {
if (e instanceof MessagesSentEvent) {
sent.addAll(((MessagesSentEvent) e).getMessageIds());
}
}
}
} }