diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncConstants.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncConstants.java index a6e3474f3..4a4200872 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncConstants.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncConstants.java @@ -35,4 +35,9 @@ public interface SyncConstants { * The maximum number of message IDs in an ack, offer or request record. */ int MAX_MESSAGE_IDS = MAX_RECORD_PAYLOAD_BYTES / UniqueId.LENGTH; + + /** + * The maximum length of a message block in bytes. + */ + int MAX_BLOCK_LENGTH = 32 * 2014; // 32 KiB } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/BrambleCoreModule.java b/bramble-core/src/main/java/org/briarproject/bramble/BrambleCoreModule.java index 44535a50a..6dee021a8 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/BrambleCoreModule.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/BrambleCoreModule.java @@ -9,6 +9,7 @@ import org.briarproject.bramble.db.DatabaseExecutorModule; import org.briarproject.bramble.db.DatabaseModule; import org.briarproject.bramble.event.EventModule; import org.briarproject.bramble.identity.IdentityModule; +import org.briarproject.bramble.io.IoModule; import org.briarproject.bramble.keyagreement.KeyAgreementModule; import org.briarproject.bramble.lifecycle.LifecycleModule; import org.briarproject.bramble.plugin.PluginModule; @@ -36,6 +37,7 @@ import dagger.Module; DatabaseExecutorModule.class, EventModule.class, IdentityModule.class, + IoModule.class, KeyAgreementModule.class, LifecycleModule.class, PluginModule.class, diff --git a/bramble-core/src/main/java/org/briarproject/bramble/io/IoModule.java b/bramble-core/src/main/java/org/briarproject/bramble/io/IoModule.java new file mode 100644 index 000000000..8e996766d --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/io/IoModule.java @@ -0,0 +1,16 @@ +package org.briarproject.bramble.io; + +import org.briarproject.bramble.api.io.MessageInputStreamFactory; + +import dagger.Module; +import dagger.Provides; + +@Module +public class IoModule { + + @Provides + MessageInputStreamFactory provideMessageInputStreamFactory( + MessageInputStreamFactoryImpl messageInputStreamFactory) { + return messageInputStreamFactory; + } +} diff --git a/bramble-core/src/main/java/org/briarproject/bramble/io/MessageInputStreamFactoryImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/io/MessageInputStreamFactoryImpl.java new file mode 100644 index 000000000..16d420d5d --- /dev/null +++ b/bramble-core/src/main/java/org/briarproject/bramble/io/MessageInputStreamFactoryImpl.java @@ -0,0 +1,32 @@ +package org.briarproject.bramble.io; + +import org.briarproject.bramble.api.db.DatabaseExecutor; +import org.briarproject.bramble.api.io.BlockSource; +import org.briarproject.bramble.api.io.MessageInputStreamFactory; +import org.briarproject.bramble.api.sync.MessageId; + +import java.io.InputStream; +import java.util.concurrent.Executor; + +import javax.inject.Inject; + +import static org.briarproject.bramble.api.sync.SyncConstants.MAX_BLOCK_LENGTH; + +class MessageInputStreamFactoryImpl implements MessageInputStreamFactory { + + private final Executor dbExecutor; + private final BlockSource blockSource; + + @Inject + MessageInputStreamFactoryImpl(@DatabaseExecutor Executor dbExecutor, + BlockSource blockSource) { + this.dbExecutor = dbExecutor; + this.blockSource = blockSource; + } + + @Override + public InputStream getMessageInputStream(MessageId m) { + return new BlockSourceInputStream(MAX_BLOCK_LENGTH, dbExecutor, + blockSource, m); + } +} diff --git a/bramble-core/src/test/java/org/briarproject/bramble/io/BlockSourceInputStreamTest.java b/bramble-core/src/test/java/org/briarproject/bramble/io/BlockSourceInputStreamTest.java index 5f3c7539a..8675cf15b 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/io/BlockSourceInputStreamTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/io/BlockSourceInputStreamTest.java @@ -15,6 +15,7 @@ import java.util.Random; import java.util.concurrent.Executor; import static java.util.concurrent.Executors.newSingleThreadExecutor; +import static org.briarproject.bramble.api.sync.SyncConstants.MAX_BLOCK_LENGTH; import static org.briarproject.bramble.test.TestUtils.getRandomId; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -25,8 +26,6 @@ public class BlockSourceInputStreamTest extends BrambleMockTestCase { private static final int MAX_DATA_BYTES = 1_000_000; private static final int READ_BUFFER_BYTES = 4 * 1024; - private static final int BLOCK_BYTES = 32 * 1024; - private static final int MIN_BUFFER_BYTES = 32 * 1024; private final BlockSource blockSource; @@ -42,8 +41,8 @@ public class BlockSourceInputStreamTest extends BrambleMockTestCase { @Test public void testReadSingleBytes() throws IOException { byte[] data = createRandomData(); - BlockSource source = new ByteArrayBlockSource(data, BLOCK_BYTES); - InputStream in = new BlockSourceInputStream(MIN_BUFFER_BYTES, executor, + BlockSource source = new ByteArrayBlockSource(data, MAX_BLOCK_LENGTH); + InputStream in = new BlockSourceInputStream(MAX_BLOCK_LENGTH, executor, source, messageId); ByteArrayOutputStream out = new ByteArrayOutputStream(); //noinspection ForLoopReplaceableByForEach @@ -62,8 +61,8 @@ public class BlockSourceInputStreamTest extends BrambleMockTestCase { @Test public void testReadByteArrays() throws IOException { byte[] data = createRandomData(); - BlockSource source = new ByteArrayBlockSource(data, BLOCK_BYTES); - InputStream in = new BlockSourceInputStream(MIN_BUFFER_BYTES, executor, + BlockSource source = new ByteArrayBlockSource(data, MAX_BLOCK_LENGTH); + InputStream in = new BlockSourceInputStream(MAX_BLOCK_LENGTH, executor, source, messageId); ByteArrayOutputStream out = new ByteArrayOutputStream(); byte[] buf = new byte[READ_BUFFER_BYTES]; @@ -93,7 +92,7 @@ public class BlockSourceInputStreamTest extends BrambleMockTestCase { will(throwException(new DbException())); }}); - InputStream in = new BlockSourceInputStream(MIN_BUFFER_BYTES, executor, + InputStream in = new BlockSourceInputStream(MAX_BLOCK_LENGTH, executor, blockSource, messageId); //noinspection ResultOfMethodCallIgnored in.read(); @@ -108,7 +107,7 @@ public class BlockSourceInputStreamTest extends BrambleMockTestCase { will(throwException(new DbException())); }}); - InputStream in = new BlockSourceInputStream(MIN_BUFFER_BYTES, executor, + InputStream in = new BlockSourceInputStream(MAX_BLOCK_LENGTH, executor, blockSource, messageId); //noinspection ResultOfMethodCallIgnored in.read(); @@ -116,12 +115,12 @@ public class BlockSourceInputStreamTest extends BrambleMockTestCase { @Test public void testReadFullBlockAtEndOfMessage() throws Exception { - testReadBlockAtEndOfMessage(BLOCK_BYTES); + testReadBlockAtEndOfMessage(MAX_BLOCK_LENGTH); } @Test public void testReadPartialBlockAtEndOfMessage() throws Exception { - testReadBlockAtEndOfMessage(BLOCK_BYTES - 1); + testReadBlockAtEndOfMessage(MAX_BLOCK_LENGTH - 1); } private void testReadBlockAtEndOfMessage(int blockLength) throws Exception { @@ -135,9 +134,9 @@ public class BlockSourceInputStreamTest extends BrambleMockTestCase { will(returnValue(block)); }}); - InputStream in = new BlockSourceInputStream(MIN_BUFFER_BYTES, executor, + InputStream in = new BlockSourceInputStream(MAX_BLOCK_LENGTH, executor, blockSource, messageId); - byte[] buf = new byte[BLOCK_BYTES * 2]; + byte[] buf = new byte[MAX_BLOCK_LENGTH * 2]; assertEquals(block.length, in.read(buf, 0, buf.length)); assertArrayEquals(block, copyOfRange(buf, 0, block.length)); assertEquals(-1, in.read(buf, 0, buf.length));