Implement MessageInputStreamFactory interface.

This commit is contained in:
akwizgran
2018-12-06 14:05:36 +00:00
parent eea453fc5f
commit 3487a7cfee
5 changed files with 66 additions and 12 deletions

View File

@@ -35,4 +35,9 @@ public interface SyncConstants {
* The maximum number of message IDs in an ack, offer or request record.
*/
int MAX_MESSAGE_IDS = MAX_RECORD_PAYLOAD_BYTES / UniqueId.LENGTH;
/**
* The maximum length of a message block in bytes.
*/
int MAX_BLOCK_LENGTH = 32 * 2014; // 32 KiB
}

View File

@@ -9,6 +9,7 @@ import org.briarproject.bramble.db.DatabaseExecutorModule;
import org.briarproject.bramble.db.DatabaseModule;
import org.briarproject.bramble.event.EventModule;
import org.briarproject.bramble.identity.IdentityModule;
import org.briarproject.bramble.io.IoModule;
import org.briarproject.bramble.keyagreement.KeyAgreementModule;
import org.briarproject.bramble.lifecycle.LifecycleModule;
import org.briarproject.bramble.plugin.PluginModule;
@@ -36,6 +37,7 @@ import dagger.Module;
DatabaseExecutorModule.class,
EventModule.class,
IdentityModule.class,
IoModule.class,
KeyAgreementModule.class,
LifecycleModule.class,
PluginModule.class,

View File

@@ -0,0 +1,16 @@
package org.briarproject.bramble.io;
import org.briarproject.bramble.api.io.MessageInputStreamFactory;
import dagger.Module;
import dagger.Provides;
@Module
public class IoModule {
@Provides
MessageInputStreamFactory provideMessageInputStreamFactory(
MessageInputStreamFactoryImpl messageInputStreamFactory) {
return messageInputStreamFactory;
}
}

View File

@@ -0,0 +1,32 @@
package org.briarproject.bramble.io;
import org.briarproject.bramble.api.db.DatabaseExecutor;
import org.briarproject.bramble.api.io.BlockSource;
import org.briarproject.bramble.api.io.MessageInputStreamFactory;
import org.briarproject.bramble.api.sync.MessageId;
import java.io.InputStream;
import java.util.concurrent.Executor;
import javax.inject.Inject;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_BLOCK_LENGTH;
class MessageInputStreamFactoryImpl implements MessageInputStreamFactory {
private final Executor dbExecutor;
private final BlockSource blockSource;
@Inject
MessageInputStreamFactoryImpl(@DatabaseExecutor Executor dbExecutor,
BlockSource blockSource) {
this.dbExecutor = dbExecutor;
this.blockSource = blockSource;
}
@Override
public InputStream getMessageInputStream(MessageId m) {
return new BlockSourceInputStream(MAX_BLOCK_LENGTH, dbExecutor,
blockSource, m);
}
}

View File

@@ -15,6 +15,7 @@ import java.util.Random;
import java.util.concurrent.Executor;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_BLOCK_LENGTH;
import static org.briarproject.bramble.test.TestUtils.getRandomId;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -25,8 +26,6 @@ public class BlockSourceInputStreamTest extends BrambleMockTestCase {
private static final int MAX_DATA_BYTES = 1_000_000;
private static final int READ_BUFFER_BYTES = 4 * 1024;
private static final int BLOCK_BYTES = 32 * 1024;
private static final int MIN_BUFFER_BYTES = 32 * 1024;
private final BlockSource blockSource;
@@ -42,8 +41,8 @@ public class BlockSourceInputStreamTest extends BrambleMockTestCase {
@Test
public void testReadSingleBytes() throws IOException {
byte[] data = createRandomData();
BlockSource source = new ByteArrayBlockSource(data, BLOCK_BYTES);
InputStream in = new BlockSourceInputStream(MIN_BUFFER_BYTES, executor,
BlockSource source = new ByteArrayBlockSource(data, MAX_BLOCK_LENGTH);
InputStream in = new BlockSourceInputStream(MAX_BLOCK_LENGTH, executor,
source, messageId);
ByteArrayOutputStream out = new ByteArrayOutputStream();
//noinspection ForLoopReplaceableByForEach
@@ -62,8 +61,8 @@ public class BlockSourceInputStreamTest extends BrambleMockTestCase {
@Test
public void testReadByteArrays() throws IOException {
byte[] data = createRandomData();
BlockSource source = new ByteArrayBlockSource(data, BLOCK_BYTES);
InputStream in = new BlockSourceInputStream(MIN_BUFFER_BYTES, executor,
BlockSource source = new ByteArrayBlockSource(data, MAX_BLOCK_LENGTH);
InputStream in = new BlockSourceInputStream(MAX_BLOCK_LENGTH, executor,
source, messageId);
ByteArrayOutputStream out = new ByteArrayOutputStream();
byte[] buf = new byte[READ_BUFFER_BYTES];
@@ -93,7 +92,7 @@ public class BlockSourceInputStreamTest extends BrambleMockTestCase {
will(throwException(new DbException()));
}});
InputStream in = new BlockSourceInputStream(MIN_BUFFER_BYTES, executor,
InputStream in = new BlockSourceInputStream(MAX_BLOCK_LENGTH, executor,
blockSource, messageId);
//noinspection ResultOfMethodCallIgnored
in.read();
@@ -108,7 +107,7 @@ public class BlockSourceInputStreamTest extends BrambleMockTestCase {
will(throwException(new DbException()));
}});
InputStream in = new BlockSourceInputStream(MIN_BUFFER_BYTES, executor,
InputStream in = new BlockSourceInputStream(MAX_BLOCK_LENGTH, executor,
blockSource, messageId);
//noinspection ResultOfMethodCallIgnored
in.read();
@@ -116,12 +115,12 @@ public class BlockSourceInputStreamTest extends BrambleMockTestCase {
@Test
public void testReadFullBlockAtEndOfMessage() throws Exception {
testReadBlockAtEndOfMessage(BLOCK_BYTES);
testReadBlockAtEndOfMessage(MAX_BLOCK_LENGTH);
}
@Test
public void testReadPartialBlockAtEndOfMessage() throws Exception {
testReadBlockAtEndOfMessage(BLOCK_BYTES - 1);
testReadBlockAtEndOfMessage(MAX_BLOCK_LENGTH - 1);
}
private void testReadBlockAtEndOfMessage(int blockLength) throws Exception {
@@ -135,9 +134,9 @@ public class BlockSourceInputStreamTest extends BrambleMockTestCase {
will(returnValue(block));
}});
InputStream in = new BlockSourceInputStream(MIN_BUFFER_BYTES, executor,
InputStream in = new BlockSourceInputStream(MAX_BLOCK_LENGTH, executor,
blockSource, messageId);
byte[] buf = new byte[BLOCK_BYTES * 2];
byte[] buf = new byte[MAX_BLOCK_LENGTH * 2];
assertEquals(block.length, in.read(buf, 0, buf.length));
assertArrayEquals(block, copyOfRange(buf, 0, block.length));
assertEquals(-1, in.read(buf, 0, buf.length));