Bring protocols in line with spec

This commit is contained in:
Torsten Grote
2016-12-14 13:00:43 -02:00
parent cc5c000278
commit 501980d8fe
34 changed files with 320 additions and 281 deletions

View File

@@ -63,6 +63,7 @@ import static org.briarproject.bramble.api.identity.AuthorConstants.MAX_PUBLIC_K
import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE;
import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED;
import static org.briarproject.bramble.api.sync.Group.Visibility.VISIBLE;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_GROUP_DESCRIPTOR_LENGTH;
import static org.briarproject.bramble.api.sync.ValidationManager.State.DELIVERED;
import static org.briarproject.bramble.api.sync.ValidationManager.State.UNKNOWN;
import static org.briarproject.bramble.api.transport.TransportConstants.REORDERING_WINDOW_SIZE;
@@ -95,7 +96,7 @@ public class DatabaseComponentImplTest extends BrambleTestCase {
public DatabaseComponentImplTest() {
clientId = new ClientId(TestUtils.getRandomString(5));
groupId = new GroupId(TestUtils.getRandomId());
byte[] descriptor = new byte[0];
byte[] descriptor = new byte[MAX_GROUP_DESCRIPTOR_LENGTH];
group = new Group(groupId, clientId, descriptor);
authorId = new AuthorId(TestUtils.getRandomId());
author = new Author(authorId, "Alice", new byte[MAX_PUBLIC_KEY_LENGTH]);

View File

@@ -47,6 +47,7 @@ import static org.briarproject.bramble.api.identity.AuthorConstants.MAX_PUBLIC_K
import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE;
import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED;
import static org.briarproject.bramble.api.sync.Group.Visibility.VISIBLE;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_GROUP_DESCRIPTOR_LENGTH;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_LENGTH;
import static org.briarproject.bramble.api.sync.ValidationManager.State.DELIVERED;
import static org.briarproject.bramble.api.sync.ValidationManager.State.INVALID;
@@ -84,7 +85,7 @@ public class H2DatabaseTest extends BrambleTestCase {
public H2DatabaseTest() throws Exception {
groupId = new GroupId(TestUtils.getRandomId());
clientId = new ClientId(TestUtils.getRandomString(5));
byte[] descriptor = new byte[0];
byte[] descriptor = new byte[MAX_GROUP_DESCRIPTOR_LENGTH];
group = new Group(groupId, clientId, descriptor);
AuthorId authorId = new AuthorId(TestUtils.getRandomId());
author = new Author(authorId, "Alice", new byte[MAX_PUBLIC_KEY_LENGTH]);
@@ -1316,7 +1317,7 @@ public class H2DatabaseTest extends BrambleTestCase {
// Add a second group
GroupId groupId1 = new GroupId(TestUtils.getRandomId());
Group group1 = new Group(groupId1, clientId,
TestUtils.getRandomBytes(42));
TestUtils.getRandomBytes(MAX_GROUP_DESCRIPTOR_LENGTH));
db.addGroup(txn, group1);
// Add a message to the second group

View File

@@ -92,7 +92,7 @@ public class KeyAgreementProtocolTest extends BrambleTestCase {
oneOf(callbacks).connectionWaiting();
oneOf(transport).receiveKey();
will(returnValue(BOB_PUBKEY));
oneOf(callbacks).initialPacketReceived();
oneOf(callbacks).initialRecordReceived();
// Alice verifies Bob's public key
oneOf(crypto).deriveKeyCommitment(BOB_PUBKEY);
@@ -152,7 +152,7 @@ public class KeyAgreementProtocolTest extends BrambleTestCase {
// Bob receives Alice's public key
oneOf(transport).receiveKey();
will(returnValue(ALICE_PUBKEY));
oneOf(callbacks).initialPacketReceived();
oneOf(callbacks).initialRecordReceived();
// Bob verifies Alice's public key
oneOf(crypto).deriveKeyCommitment(ALICE_PUBKEY);
@@ -213,7 +213,7 @@ public class KeyAgreementProtocolTest extends BrambleTestCase {
oneOf(callbacks).connectionWaiting();
oneOf(transport).receiveKey();
will(returnValue(BAD_PUBKEY));
oneOf(callbacks).initialPacketReceived();
oneOf(callbacks).initialRecordReceived();
// Alice verifies Bob's public key
oneOf(crypto).deriveKeyCommitment(BAD_PUBKEY);
@@ -250,7 +250,7 @@ public class KeyAgreementProtocolTest extends BrambleTestCase {
// Bob receives a bad public key
oneOf(transport).receiveKey();
will(returnValue(BAD_PUBKEY));
oneOf(callbacks).initialPacketReceived();
oneOf(callbacks).initialRecordReceived();
// Bob verifies Alice's public key
oneOf(crypto).deriveKeyCommitment(BAD_PUBKEY);
@@ -296,7 +296,7 @@ public class KeyAgreementProtocolTest extends BrambleTestCase {
oneOf(callbacks).connectionWaiting();
oneOf(transport).receiveKey();
will(returnValue(BOB_PUBKEY));
oneOf(callbacks).initialPacketReceived();
oneOf(callbacks).initialRecordReceived();
// Alice verifies Bob's public key
oneOf(crypto).deriveKeyCommitment(BOB_PUBKEY);
@@ -357,7 +357,7 @@ public class KeyAgreementProtocolTest extends BrambleTestCase {
// Bob receives Alice's public key
oneOf(transport).receiveKey();
will(returnValue(ALICE_PUBKEY));
oneOf(callbacks).initialPacketReceived();
oneOf(callbacks).initialRecordReceived();
// Bob verifies Alice's public key
oneOf(crypto).deriveKeyCommitment(ALICE_PUBKEY);

View File

@@ -10,20 +10,20 @@ import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import static org.briarproject.bramble.api.sync.PacketTypes.ACK;
import static org.briarproject.bramble.api.sync.PacketTypes.OFFER;
import static org.briarproject.bramble.api.sync.PacketTypes.REQUEST;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_PACKET_PAYLOAD_LENGTH;
import static org.briarproject.bramble.api.sync.SyncConstants.PACKET_HEADER_LENGTH;
import static org.briarproject.bramble.api.sync.RecordTypes.ACK;
import static org.briarproject.bramble.api.sync.RecordTypes.OFFER;
import static org.briarproject.bramble.api.sync.RecordTypes.REQUEST;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_RECORD_PAYLOAD_LENGTH;
import static org.briarproject.bramble.api.sync.SyncConstants.RECORD_HEADER_LENGTH;
import static org.junit.Assert.assertEquals;
public class PacketReaderImplTest extends BrambleTestCase {
public class RecordReaderImplTest extends BrambleTestCase {
@Test(expected = FormatException.class)
public void testFormatExceptionIfAckIsTooLarge() throws Exception {
byte[] b = createAck(true);
ByteArrayInputStream in = new ByteArrayInputStream(b);
PacketReaderImpl reader = new PacketReaderImpl(null, in);
RecordReaderImpl reader = new RecordReaderImpl(null, in);
reader.readAck();
}
@@ -31,7 +31,7 @@ public class PacketReaderImplTest extends BrambleTestCase {
public void testNoFormatExceptionIfAckIsMaximumSize() throws Exception {
byte[] b = createAck(false);
ByteArrayInputStream in = new ByteArrayInputStream(b);
PacketReaderImpl reader = new PacketReaderImpl(null, in);
RecordReaderImpl reader = new RecordReaderImpl(null, in);
reader.readAck();
}
@@ -39,7 +39,7 @@ public class PacketReaderImplTest extends BrambleTestCase {
public void testEmptyAck() throws Exception {
byte[] b = createEmptyAck();
ByteArrayInputStream in = new ByteArrayInputStream(b);
PacketReaderImpl reader = new PacketReaderImpl(null, in);
RecordReaderImpl reader = new RecordReaderImpl(null, in);
reader.readAck();
}
@@ -47,7 +47,7 @@ public class PacketReaderImplTest extends BrambleTestCase {
public void testFormatExceptionIfOfferIsTooLarge() throws Exception {
byte[] b = createOffer(true);
ByteArrayInputStream in = new ByteArrayInputStream(b);
PacketReaderImpl reader = new PacketReaderImpl(null, in);
RecordReaderImpl reader = new RecordReaderImpl(null, in);
reader.readOffer();
}
@@ -55,7 +55,7 @@ public class PacketReaderImplTest extends BrambleTestCase {
public void testNoFormatExceptionIfOfferIsMaximumSize() throws Exception {
byte[] b = createOffer(false);
ByteArrayInputStream in = new ByteArrayInputStream(b);
PacketReaderImpl reader = new PacketReaderImpl(null, in);
RecordReaderImpl reader = new RecordReaderImpl(null, in);
reader.readOffer();
}
@@ -63,7 +63,7 @@ public class PacketReaderImplTest extends BrambleTestCase {
public void testEmptyOffer() throws Exception {
byte[] b = createEmptyOffer();
ByteArrayInputStream in = new ByteArrayInputStream(b);
PacketReaderImpl reader = new PacketReaderImpl(null, in);
RecordReaderImpl reader = new RecordReaderImpl(null, in);
reader.readOffer();
}
@@ -71,7 +71,7 @@ public class PacketReaderImplTest extends BrambleTestCase {
public void testFormatExceptionIfRequestIsTooLarge() throws Exception {
byte[] b = createRequest(true);
ByteArrayInputStream in = new ByteArrayInputStream(b);
PacketReaderImpl reader = new PacketReaderImpl(null, in);
RecordReaderImpl reader = new RecordReaderImpl(null, in);
reader.readRequest();
}
@@ -79,7 +79,7 @@ public class PacketReaderImplTest extends BrambleTestCase {
public void testNoFormatExceptionIfRequestIsMaximumSize() throws Exception {
byte[] b = createRequest(false);
ByteArrayInputStream in = new ByteArrayInputStream(b);
PacketReaderImpl reader = new PacketReaderImpl(null, in);
RecordReaderImpl reader = new RecordReaderImpl(null, in);
reader.readRequest();
}
@@ -87,76 +87,76 @@ public class PacketReaderImplTest extends BrambleTestCase {
public void testEmptyRequest() throws Exception {
byte[] b = createEmptyRequest();
ByteArrayInputStream in = new ByteArrayInputStream(b);
PacketReaderImpl reader = new PacketReaderImpl(null, in);
RecordReaderImpl reader = new RecordReaderImpl(null, in);
reader.readRequest();
}
private byte[] createAck(boolean tooBig) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(new byte[PACKET_HEADER_LENGTH]);
while (out.size() + UniqueId.LENGTH <= PACKET_HEADER_LENGTH
+ MAX_PACKET_PAYLOAD_LENGTH) {
out.write(new byte[RECORD_HEADER_LENGTH]);
while (out.size() + UniqueId.LENGTH <= RECORD_HEADER_LENGTH
+ MAX_RECORD_PAYLOAD_LENGTH) {
out.write(TestUtils.getRandomId());
}
if (tooBig) out.write(TestUtils.getRandomId());
assertEquals(tooBig, out.size() > PACKET_HEADER_LENGTH +
MAX_PACKET_PAYLOAD_LENGTH);
byte[] packet = out.toByteArray();
packet[1] = ACK;
ByteUtils.writeUint16(packet.length - PACKET_HEADER_LENGTH, packet, 2);
return packet;
assertEquals(tooBig, out.size() > RECORD_HEADER_LENGTH +
MAX_RECORD_PAYLOAD_LENGTH);
byte[] record = out.toByteArray();
record[1] = ACK;
ByteUtils.writeUint16(record.length - RECORD_HEADER_LENGTH, record, 2);
return record;
}
private byte[] createEmptyAck() throws Exception {
byte[] packet = new byte[PACKET_HEADER_LENGTH];
packet[1] = ACK;
ByteUtils.writeUint16(packet.length - PACKET_HEADER_LENGTH, packet, 2);
return packet;
byte[] record = new byte[RECORD_HEADER_LENGTH];
record[1] = ACK;
ByteUtils.writeUint16(record.length - RECORD_HEADER_LENGTH, record, 2);
return record;
}
private byte[] createOffer(boolean tooBig) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(new byte[PACKET_HEADER_LENGTH]);
while (out.size() + UniqueId.LENGTH <= PACKET_HEADER_LENGTH
+ MAX_PACKET_PAYLOAD_LENGTH) {
out.write(new byte[RECORD_HEADER_LENGTH]);
while (out.size() + UniqueId.LENGTH <= RECORD_HEADER_LENGTH
+ MAX_RECORD_PAYLOAD_LENGTH) {
out.write(TestUtils.getRandomId());
}
if (tooBig) out.write(TestUtils.getRandomId());
assertEquals(tooBig, out.size() > PACKET_HEADER_LENGTH +
MAX_PACKET_PAYLOAD_LENGTH);
byte[] packet = out.toByteArray();
packet[1] = OFFER;
ByteUtils.writeUint16(packet.length - PACKET_HEADER_LENGTH, packet, 2);
return packet;
assertEquals(tooBig, out.size() > RECORD_HEADER_LENGTH +
MAX_RECORD_PAYLOAD_LENGTH);
byte[] record = out.toByteArray();
record[1] = OFFER;
ByteUtils.writeUint16(record.length - RECORD_HEADER_LENGTH, record, 2);
return record;
}
private byte[] createEmptyOffer() throws Exception {
byte[] packet = new byte[PACKET_HEADER_LENGTH];
packet[1] = OFFER;
ByteUtils.writeUint16(packet.length - PACKET_HEADER_LENGTH, packet, 2);
return packet;
byte[] record = new byte[RECORD_HEADER_LENGTH];
record[1] = OFFER;
ByteUtils.writeUint16(record.length - RECORD_HEADER_LENGTH, record, 2);
return record;
}
private byte[] createRequest(boolean tooBig) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(new byte[PACKET_HEADER_LENGTH]);
while (out.size() + UniqueId.LENGTH <= PACKET_HEADER_LENGTH
+ MAX_PACKET_PAYLOAD_LENGTH) {
out.write(new byte[RECORD_HEADER_LENGTH]);
while (out.size() + UniqueId.LENGTH <= RECORD_HEADER_LENGTH
+ MAX_RECORD_PAYLOAD_LENGTH) {
out.write(TestUtils.getRandomId());
}
if (tooBig) out.write(TestUtils.getRandomId());
assertEquals(tooBig, out.size() > PACKET_HEADER_LENGTH +
MAX_PACKET_PAYLOAD_LENGTH);
byte[] packet = out.toByteArray();
packet[1] = REQUEST;
ByteUtils.writeUint16(packet.length - PACKET_HEADER_LENGTH, packet, 2);
return packet;
assertEquals(tooBig, out.size() > RECORD_HEADER_LENGTH +
MAX_RECORD_PAYLOAD_LENGTH);
byte[] record = out.toByteArray();
record[1] = REQUEST;
ByteUtils.writeUint16(record.length - RECORD_HEADER_LENGTH, record, 2);
return record;
}
private byte[] createEmptyRequest() throws Exception {
byte[] packet = new byte[PACKET_HEADER_LENGTH];
packet[1] = REQUEST;
ByteUtils.writeUint16(packet.length - PACKET_HEADER_LENGTH, packet, 2);
return packet;
byte[] record = new byte[RECORD_HEADER_LENGTH];
record[1] = REQUEST;
ByteUtils.writeUint16(record.length - RECORD_HEADER_LENGTH, record, 2);
return record;
}
}

View File

@@ -6,10 +6,10 @@ import org.briarproject.bramble.api.db.Transaction;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.api.sync.PacketWriter;
import org.briarproject.bramble.test.BrambleTestCase;
import org.briarproject.bramble.test.ImmediateExecutor;
import org.briarproject.bramble.test.TestUtils;
import org.briarproject.bramble.api.sync.RecordWriter;
import org.jmock.Expectations;
import org.jmock.Mockery;
import org.junit.Test;
@@ -29,14 +29,14 @@ public class SimplexOutgoingSessionTest extends BrambleTestCase {
private final ContactId contactId;
private final MessageId messageId;
private final int maxLatency;
private final PacketWriter packetWriter;
private final RecordWriter recordWriter;
public SimplexOutgoingSessionTest() {
context = new Mockery();
db = context.mock(DatabaseComponent.class);
dbExecutor = new ImmediateExecutor();
eventBus = context.mock(EventBus.class);
packetWriter = context.mock(PacketWriter.class);
recordWriter = context.mock(RecordWriter.class);
contactId = new ContactId(234);
messageId = new MessageId(TestUtils.getRandomId());
maxLatency = Integer.MAX_VALUE;
@@ -45,7 +45,7 @@ public class SimplexOutgoingSessionTest extends BrambleTestCase {
@Test
public void testNothingToSend() throws Exception {
final SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, maxLatency, packetWriter);
dbExecutor, eventBus, contactId, maxLatency, recordWriter);
final Transaction noAckTxn = new Transaction(null, false);
final Transaction noMsgTxn = new Transaction(null, false);
@@ -68,7 +68,7 @@ public class SimplexOutgoingSessionTest extends BrambleTestCase {
oneOf(db).commitTransaction(noMsgTxn);
oneOf(db).endTransaction(noMsgTxn);
// Flush the output stream
oneOf(packetWriter).flush();
oneOf(recordWriter).flush();
// Remove listener
oneOf(eventBus).removeListener(session);
}});
@@ -83,7 +83,7 @@ public class SimplexOutgoingSessionTest extends BrambleTestCase {
final Ack ack = new Ack(Collections.singletonList(messageId));
final byte[] raw = new byte[1234];
final SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, maxLatency, packetWriter);
dbExecutor, eventBus, contactId, maxLatency, recordWriter);
final Transaction ackTxn = new Transaction(null, false);
final Transaction noAckTxn = new Transaction(null, false);
final Transaction msgTxn = new Transaction(null, false);
@@ -99,7 +99,7 @@ public class SimplexOutgoingSessionTest extends BrambleTestCase {
will(returnValue(ack));
oneOf(db).commitTransaction(ackTxn);
oneOf(db).endTransaction(ackTxn);
oneOf(packetWriter).writeAck(ack);
oneOf(recordWriter).writeAck(ack);
// One message to send
oneOf(db).startTransaction(false);
will(returnValue(msgTxn));
@@ -108,7 +108,7 @@ public class SimplexOutgoingSessionTest extends BrambleTestCase {
will(returnValue(Arrays.asList(raw)));
oneOf(db).commitTransaction(msgTxn);
oneOf(db).endTransaction(msgTxn);
oneOf(packetWriter).writeMessage(raw);
oneOf(recordWriter).writeMessage(raw);
// No more acks
oneOf(db).startTransaction(false);
will(returnValue(noAckTxn));
@@ -125,7 +125,7 @@ public class SimplexOutgoingSessionTest extends BrambleTestCase {
oneOf(db).commitTransaction(noMsgTxn);
oneOf(db).endTransaction(noMsgTxn);
// Flush the output stream
oneOf(packetWriter).flush();
oneOf(recordWriter).flush();
// Remove listener
oneOf(eventBus).removeListener(session);
}});

View File

@@ -12,10 +12,10 @@ import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.MessageFactory;
import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.api.sync.Offer;
import org.briarproject.bramble.api.sync.PacketReader;
import org.briarproject.bramble.api.sync.PacketReaderFactory;
import org.briarproject.bramble.api.sync.PacketWriter;
import org.briarproject.bramble.api.sync.PacketWriterFactory;
import org.briarproject.bramble.api.sync.RecordReader;
import org.briarproject.bramble.api.sync.RecordReaderFactory;
import org.briarproject.bramble.api.sync.RecordWriter;
import org.briarproject.bramble.api.sync.RecordWriterFactory;
import org.briarproject.bramble.api.sync.Request;
import org.briarproject.bramble.api.transport.StreamContext;
import org.briarproject.bramble.api.transport.StreamReaderFactory;
@@ -33,6 +33,7 @@ import java.util.Collection;
import javax.inject.Inject;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_GROUP_DESCRIPTOR_LENGTH;
import static org.briarproject.bramble.api.transport.TransportConstants.TAG_LENGTH;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -50,9 +51,9 @@ public class SyncIntegrationTest extends BrambleTestCase {
@Inject
StreamWriterFactory streamWriterFactory;
@Inject
PacketReaderFactory packetReaderFactory;
RecordReaderFactory recordReaderFactory;
@Inject
PacketWriterFactory packetWriterFactory;
RecordWriterFactory recordWriterFactory;
@Inject
CryptoComponent crypto;
@@ -77,7 +78,7 @@ public class SyncIntegrationTest extends BrambleTestCase {
streamNumber = 123;
// Create a group
ClientId clientId = new ClientId(TestUtils.getRandomString(5));
byte[] descriptor = new byte[0];
byte[] descriptor = new byte[MAX_GROUP_DESCRIPTOR_LENGTH];
Group group = groupFactory.createGroup(clientId, descriptor);
// Add two messages to the group
long timestamp = System.currentTimeMillis();
@@ -98,14 +99,14 @@ public class SyncIntegrationTest extends BrambleTestCase {
headerKey, streamNumber);
OutputStream streamWriter = streamWriterFactory.createStreamWriter(out,
ctx);
PacketWriter packetWriter = packetWriterFactory.createPacketWriter(
RecordWriter recordWriter = recordWriterFactory.createRecordWriter(
streamWriter);
packetWriter.writeAck(new Ack(messageIds));
packetWriter.writeMessage(message.getRaw());
packetWriter.writeMessage(message1.getRaw());
packetWriter.writeOffer(new Offer(messageIds));
packetWriter.writeRequest(new Request(messageIds));
recordWriter.writeAck(new Ack(messageIds));
recordWriter.writeMessage(message.getRaw());
recordWriter.writeMessage(message1.getRaw());
recordWriter.writeOffer(new Offer(messageIds));
recordWriter.writeRequest(new Request(messageIds));
streamWriter.flush();
return out.toByteArray();
@@ -127,31 +128,31 @@ public class SyncIntegrationTest extends BrambleTestCase {
headerKey, streamNumber);
InputStream streamReader = streamReaderFactory.createStreamReader(in,
ctx);
PacketReader packetReader = packetReaderFactory.createPacketReader(
RecordReader recordReader = recordReaderFactory.createRecordReader(
streamReader);
// Read the ack
assertTrue(packetReader.hasAck());
Ack a = packetReader.readAck();
assertTrue(recordReader.hasAck());
Ack a = recordReader.readAck();
assertEquals(messageIds, a.getMessageIds());
// Read the messages
assertTrue(packetReader.hasMessage());
Message m = packetReader.readMessage();
assertTrue(recordReader.hasMessage());
Message m = recordReader.readMessage();
checkMessageEquality(message, m);
assertTrue(packetReader.hasMessage());
m = packetReader.readMessage();
assertTrue(recordReader.hasMessage());
m = recordReader.readMessage();
checkMessageEquality(message1, m);
assertFalse(packetReader.hasMessage());
assertFalse(recordReader.hasMessage());
// Read the offer
assertTrue(packetReader.hasOffer());
Offer o = packetReader.readOffer();
assertTrue(recordReader.hasOffer());
Offer o = recordReader.readOffer();
assertEquals(messageIds, o.getMessageIds());
// Read the request
assertTrue(packetReader.hasRequest());
Request req = packetReader.readRequest();
assertTrue(recordReader.hasRequest());
Request req = recordReader.readRequest();
assertEquals(messageIds, req.getMessageIds());
in.close();