Compare commits

..

5 Commits

Author SHA1 Message Date
akwizgran
707066f1a1 Add new module to integration tests. 2018-12-06 18:02:48 +00:00
akwizgran
65e5dc266f Hook up MessagingManager to MessageInputStreamFactory. 2018-12-06 17:46:14 +00:00
akwizgran
3487a7cfee Implement MessageInputStreamFactory interface. 2018-12-06 14:05:36 +00:00
akwizgran
eea453fc5f Add placeholder BlockSource implementation to DB. 2018-12-06 13:55:49 +00:00
akwizgran
c6f460a936 Add input stream that fetches blocks from the DB. 2018-12-06 11:34:40 +00:00
39 changed files with 614 additions and 426 deletions

View File

@@ -208,6 +208,24 @@ public interface DatabaseComponent {
Collection<Message> generateRequestedBatch(Transaction txn, ContactId c,
int maxLength, int maxLatency) throws DbException;
/**
* Returns the number of blocks in the given message.
* <p>
* Read-only.
*/
int getBlockCount(Transaction txn, MessageId m) throws DbException;
/**
* Returns the given block of the given message.
* <p>
* Read-only.
*
* @throws NoSuchBlockException if 'blockNumber' is greater than or equal
* to the number of blocks in the message
*/
byte[] getBlock(Transaction txn, MessageId m, int blockNumber)
throws DbException;
/**
* Returns the contact with the given ID.
* <p/>

View File

@@ -0,0 +1,9 @@
package org.briarproject.bramble.api.db;
/**
* Thrown when a database operation is attempted for a block that is not in
* the database. This exception may occur due to concurrent updates and does
* not indicate a database error.
*/
public class NoSuchBlockException extends DbException {
}

View File

@@ -1,20 +0,0 @@
package org.briarproject.bramble.api.io;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.sync.tree.TreeHash;
import java.util.List;
public interface BlockSink {
/**
* Stores a block of the message with the given temporary ID.
*/
void putBlock(HashingId h, int blockNumber, byte[] data) throws DbException;
/**
* Sets the hash tree path of a previously stored block.
*/
void setPath(HashingId h, int blockNumber, List<TreeHash> path)
throws DbException;
}

View File

@@ -0,0 +1,21 @@
package org.briarproject.bramble.api.io;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.db.NoSuchBlockException;
import org.briarproject.bramble.api.sync.MessageId;
public interface BlockSource {
/**
* Returns the number of blocks in the given message.
*/
int getBlockCount(MessageId m) throws DbException;
/**
* Returns the given block of the given message.
*
* @throws NoSuchBlockException if 'blockNumber' is greater than or equal
* to the number of blocks in the message
*/
byte[] getBlock(MessageId m, int blockNumber) throws DbException;
}

View File

@@ -1,27 +0,0 @@
package org.briarproject.bramble.api.io;
import org.briarproject.bramble.api.UniqueId;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.MessageId;
import javax.annotation.concurrent.ThreadSafe;
/**
* Type-safe wrapper for a byte array that uniquely identifies a
* {@link Message} while it's being hashed and the {@link MessageId} is not
* yet known.
*/
@ThreadSafe
@NotNullByDefault
public class HashingId extends UniqueId {
public HashingId(byte[] id) {
super(id);
}
@Override
public boolean equals(Object o) {
return o instanceof HashingId && super.equals(o);
}
}

View File

@@ -0,0 +1,17 @@
package org.briarproject.bramble.api.io;
import org.briarproject.bramble.api.sync.MessageId;
import java.io.IOException;
import java.io.InputStream;
public interface MessageInputStreamFactory {
/**
* Returns an {@link InputStream} for reading the given message from the
* database. This method returns immediately. If the message is not in the
* database or cannot be read, reading from the stream will throw an
* {@link IOException};
*/
InputStream getMessageInputStream(MessageId m);
}

View File

@@ -1,7 +1,6 @@
package org.briarproject.bramble.api.sync;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.tree.TreeHash;
@NotNullByDefault
public interface MessageFactory {
@@ -11,6 +10,4 @@ public interface MessageFactory {
Message createMessage(byte[] raw);
byte[] getRawMessage(Message m);
MessageId getMessageId(GroupId g, long timestamp, TreeHash rootHash);
}

View File

@@ -24,12 +24,6 @@ public class MessageId extends UniqueId {
public static final String BLOCK_LABEL =
"org.briarproject.bramble/MESSAGE_BLOCK";
/**
* Label for hashing two tree hashes to produce a parent.
*/
public static final String TREE_LABEL =
"org.briarproject.bramble/MESSAGE_TREE";
public MessageId(byte[] id) {
super(id);
}

View File

@@ -1,21 +0,0 @@
package org.briarproject.bramble.api.sync.tree;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
@NotNullByDefault
public class LeafNode extends TreeNode {
public LeafNode(TreeHash hash, int blockNumber) {
super(hash, 0, blockNumber, blockNumber);
}
@Override
public TreeNode getLeftChild() {
throw new UnsupportedOperationException();
}
@Override
public TreeNode getRightChild() {
throw new UnsupportedOperationException();
}
}

View File

@@ -1,26 +0,0 @@
package org.briarproject.bramble.api.sync.tree;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
@NotNullByDefault
public class ParentNode extends TreeNode {
private final TreeNode left, right;
public ParentNode(TreeHash hash, TreeNode left, TreeNode right) {
super(hash, Math.max(left.getHeight(), right.getHeight()) + 1,
left.getFirstBlockNumber(), right.getLastBlockNumber());
this.left = left;
this.right = right;
}
@Override
public TreeNode getLeftChild() {
return left;
}
@Override
public TreeNode getRightChild() {
return right;
}
}

View File

@@ -1,21 +0,0 @@
package org.briarproject.bramble.api.sync.tree;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.io.BlockSink;
import org.briarproject.bramble.api.io.HashingId;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import java.io.IOException;
import java.io.InputStream;
@NotNullByDefault
public interface StreamHasher {
/**
* Reads the given input stream, divides the data into blocks, stores
* the blocks and the resulting hash tree using the given block sink and
* temporary ID, and returns the hash tree.
*/
TreeNode hash(InputStream in, BlockSink sink, HashingId h)
throws IOException, DbException;
}

View File

@@ -1,24 +0,0 @@
package org.briarproject.bramble.api.sync.tree;
import org.briarproject.bramble.api.UniqueId;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import javax.annotation.concurrent.ThreadSafe;
/**
* Type-safe wrapper for a byte array that uniquely identifies a sequence of
* one or more message blocks.
*/
@ThreadSafe
@NotNullByDefault
public class TreeHash extends UniqueId {
public TreeHash(byte[] id) {
super(id);
}
@Override
public boolean equals(Object o) {
return o instanceof TreeHash && super.equals(o);
}
}

View File

@@ -1,11 +0,0 @@
package org.briarproject.bramble.api.sync.tree;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
@NotNullByDefault
public interface TreeHasher {
LeafNode hashBlock(int blockNumber, byte[] data);
ParentNode mergeTrees(TreeNode left, TreeNode right);
}

View File

@@ -1,38 +0,0 @@
package org.briarproject.bramble.api.sync.tree;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
@NotNullByDefault
public abstract class TreeNode {
private final TreeHash hash;
private final int height, firstBlockNumber, lastBlockNumber;
TreeNode(TreeHash hash, int height, int firstBlockNumber,
int lastBlockNumber) {
this.hash = hash;
this.height = height;
this.firstBlockNumber = firstBlockNumber;
this.lastBlockNumber = lastBlockNumber;
}
public TreeHash getHash() {
return hash;
}
public int getHeight() {
return height;
}
public int getFirstBlockNumber() {
return firstBlockNumber;
}
public int getLastBlockNumber() {
return lastBlockNumber;
}
public abstract TreeNode getLeftChild();
public abstract TreeNode getRightChild();
}

View File

@@ -9,6 +9,7 @@ import org.briarproject.bramble.db.DatabaseExecutorModule;
import org.briarproject.bramble.db.DatabaseModule;
import org.briarproject.bramble.event.EventModule;
import org.briarproject.bramble.identity.IdentityModule;
import org.briarproject.bramble.io.IoModule;
import org.briarproject.bramble.keyagreement.KeyAgreementModule;
import org.briarproject.bramble.lifecycle.LifecycleModule;
import org.briarproject.bramble.plugin.PluginModule;
@@ -36,6 +37,7 @@ import dagger.Module;
DatabaseExecutorModule.class,
EventModule.class,
IdentityModule.class,
IoModule.class,
KeyAgreementModule.class,
LifecycleModule.class,
PluginModule.class,

View File

@@ -0,0 +1,29 @@
package org.briarproject.bramble.db;
import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.io.BlockSource;
import org.briarproject.bramble.api.sync.MessageId;
import javax.inject.Inject;
class BlockSourceImpl implements BlockSource {
private final DatabaseComponent db;
@Inject
BlockSourceImpl(DatabaseComponent db) {
this.db = db;
}
@Override
public int getBlockCount(MessageId m) throws DbException {
return db.transactionWithResult(true, txn -> db.getBlockCount(txn, m));
}
@Override
public byte[] getBlock(MessageId m, int blockNumber) throws DbException {
return db.transactionWithResult(true, txn ->
db.getBlock(txn, m, blockNumber));
}
}

View File

@@ -14,6 +14,7 @@ import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.db.DbRunnable;
import org.briarproject.bramble.api.db.Metadata;
import org.briarproject.bramble.api.db.MigrationListener;
import org.briarproject.bramble.api.db.NoSuchBlockException;
import org.briarproject.bramble.api.db.NoSuchContactException;
import org.briarproject.bramble.api.db.NoSuchGroupException;
import org.briarproject.bramble.api.db.NoSuchLocalAuthorException;
@@ -420,6 +421,26 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return messages;
}
@Override
public int getBlockCount(Transaction transaction, MessageId m)
throws DbException {
T txn = unbox(transaction);
if (!db.containsMessage(txn, m))
throw new NoSuchMessageException();
return 1;
}
@Override
public byte[] getBlock(Transaction transaction, MessageId m,
int blockNumber) throws DbException {
T txn = unbox(transaction);
if (!db.containsMessage(txn, m))
throw new NoSuchMessageException();
if (blockNumber != 0)
throw new NoSuchBlockException();
return db.getMessage(txn, m).getBody();
}
@Override
public Contact getContact(Transaction transaction, ContactId c)
throws DbException {

View File

@@ -3,6 +3,7 @@ package org.briarproject.bramble.db;
import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.DatabaseConfig;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.io.BlockSource;
import org.briarproject.bramble.api.lifecycle.ShutdownManager;
import org.briarproject.bramble.api.sync.MessageFactory;
import org.briarproject.bramble.api.system.Clock;
@@ -31,4 +32,9 @@ public class DatabaseModule {
return new DatabaseComponentImpl<>(db, Connection.class, eventBus,
shutdown);
}
@Provides
BlockSource provideBlockSource(BlockSourceImpl blockSource) {
return blockSource;
}
}

View File

@@ -0,0 +1,155 @@
package org.briarproject.bramble.io;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import static java.lang.System.arraycopy;
import static java.lang.Thread.currentThread;
/**
* An {@link InputStream} that asynchronously fetches blocks of data on demand.
*/
@ThreadSafe
@NotNullByDefault
abstract class BlockInputStream extends InputStream {
private final int minBufferBytes;
private final BlockingQueue<Buffer> queue = new ArrayBlockingQueue<>(1);
private final Object lock = new Object();
@GuardedBy("lock")
@Nullable
private Buffer buffer = null;
@GuardedBy("lock")
private int offset = 0;
@GuardedBy("lock")
private boolean fetchingBlock = false;
abstract void fetchBlockAsync(int blockNumber);
BlockInputStream(int minBufferBytes) {
this.minBufferBytes = minBufferBytes;
}
@Override
public int read() throws IOException {
synchronized (lock) {
if (!prepareRead()) return -1;
if (buffer == null) throw new AssertionError();
return buffer.data[offset++] & 0xFF;
}
}
@Override
public int read(byte[] b) throws IOException {
return read(b, 0, b.length);
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
if (off < 0 || len < 0 || off + len > b.length)
throw new IllegalArgumentException();
synchronized (lock) {
if (!prepareRead()) return -1;
if (buffer == null) throw new AssertionError();
len = Math.min(len, buffer.length - offset);
if (len < 0) throw new AssertionError();
arraycopy(buffer.data, offset, b, off, len);
offset += len;
return len;
}
}
private boolean prepareRead() throws IOException {
throwExceptionIfNecessary();
if (isEndOfStream()) return false;
if (shouldFetchBlock()) fetchBlockAsync();
waitForBlock();
if (buffer == null) throw new AssertionError();
return offset < buffer.length;
}
@GuardedBy("lock")
private void throwExceptionIfNecessary() throws IOException {
if (buffer != null && buffer.exception != null)
throw new IOException(buffer.exception);
}
@GuardedBy("lock")
private boolean isEndOfStream() {
return buffer != null && offset == buffer.length && !fetchingBlock;
}
@GuardedBy("lock")
private boolean shouldFetchBlock() {
if (fetchingBlock) return false;
if (buffer == null) return true;
if (buffer.length == 0) return false;
return buffer.length - offset < minBufferBytes;
}
@GuardedBy("lock")
private void fetchBlockAsync() {
if (buffer == null) fetchBlockAsync(0);
else fetchBlockAsync(buffer.blockNumber + 1);
fetchingBlock = true;
}
@GuardedBy("lock")
private void waitForBlock() throws IOException {
if (buffer != null && offset < buffer.length) return;
try {
buffer = queue.take();
} catch (InterruptedException e) {
currentThread().interrupt();
throw new InterruptedIOException();
}
fetchingBlock = false;
offset = 0;
throwExceptionIfNecessary();
}
void fetchSucceeded(int blockNumber, byte[] data, int length) {
queue.add(new Buffer(blockNumber, data, length));
}
void fetchFailed(int blockNumber, Exception exception) {
queue.add(new Buffer(blockNumber, exception));
}
private static class Buffer {
private final int blockNumber;
private final byte[] data;
private final int length;
@Nullable
private final Exception exception;
private Buffer(int blockNumber, byte[] data, int length) {
if (length < 0 || length > data.length)
throw new IllegalArgumentException();
this.blockNumber = blockNumber;
this.data = data;
this.length = length;
exception = null;
}
private Buffer(int blockNumber, Exception exception) {
this.blockNumber = blockNumber;
this.exception = exception;
data = new byte[0];
length = 0;
}
}
}

View File

@@ -0,0 +1,53 @@
package org.briarproject.bramble.io;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.io.BlockSource;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.MessageId;
import java.util.concurrent.Executor;
import javax.annotation.concurrent.ThreadSafe;
/**
* A {@link BlockInputStream} that fetches data from a {@link BlockSource}.
*/
@ThreadSafe
@NotNullByDefault
class BlockSourceInputStream extends BlockInputStream {
private final Executor executor;
private final BlockSource blockSource;
private final MessageId messageId;
private volatile int blockCount = -1;
BlockSourceInputStream(int minBufferBytes, Executor executor,
BlockSource blockSource, MessageId messageId) {
super(minBufferBytes);
this.executor = executor;
this.blockSource = blockSource;
this.messageId = messageId;
}
@Override
void fetchBlockAsync(int blockNumber) {
executor.execute(() -> {
try {
if (blockCount == -1) {
blockCount = blockSource.getBlockCount(messageId);
}
if (blockNumber > blockCount) {
fetchFailed(blockNumber, new IllegalArgumentException());
} else if (blockNumber == blockCount) {
fetchSucceeded(blockNumber, new byte[0], 0); // EOF
} else {
byte[] block = blockSource.getBlock(messageId, blockNumber);
fetchSucceeded(blockNumber, block, block.length);
}
} catch (DbException e) {
fetchFailed(blockNumber, e);
}
});
}
}

View File

@@ -0,0 +1,16 @@
package org.briarproject.bramble.io;
import org.briarproject.bramble.api.io.MessageInputStreamFactory;
import dagger.Module;
import dagger.Provides;
@Module
public class IoModule {
@Provides
MessageInputStreamFactory provideMessageInputStreamFactory(
MessageInputStreamFactoryImpl messageInputStreamFactory) {
return messageInputStreamFactory;
}
}

View File

@@ -0,0 +1,32 @@
package org.briarproject.bramble.io;
import org.briarproject.bramble.api.db.DatabaseExecutor;
import org.briarproject.bramble.api.io.BlockSource;
import org.briarproject.bramble.api.io.MessageInputStreamFactory;
import org.briarproject.bramble.api.sync.MessageId;
import java.io.InputStream;
import java.util.concurrent.Executor;
import javax.inject.Inject;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_BLOCK_LENGTH;
class MessageInputStreamFactoryImpl implements MessageInputStreamFactory {
private final Executor dbExecutor;
private final BlockSource blockSource;
@Inject
MessageInputStreamFactoryImpl(@DatabaseExecutor Executor dbExecutor,
BlockSource blockSource) {
this.dbExecutor = dbExecutor;
this.blockSource = blockSource;
}
@Override
public InputStream getMessageInputStream(MessageId m) {
return new BlockSourceInputStream(MAX_BLOCK_LENGTH, dbExecutor,
blockSource, m);
}
}

View File

@@ -7,7 +7,6 @@ import org.briarproject.bramble.api.sync.GroupId;
import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.MessageFactory;
import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.api.sync.tree.TreeHash;
import org.briarproject.bramble.util.ByteUtils;
import javax.annotation.concurrent.Immutable;
@@ -40,19 +39,13 @@ class MessageFactoryImpl implements MessageFactory {
if (body.length == 0) throw new IllegalArgumentException();
if (body.length > MAX_MESSAGE_BODY_LENGTH)
throw new IllegalArgumentException();
MessageId id = getMessageIdFromBody(g, timestamp, body);
MessageId id = getMessageId(g, timestamp, body);
return new Message(id, g, timestamp, body);
}
private MessageId getMessageIdFromBody(GroupId g, long timestamp,
byte[] body) {
private MessageId getMessageId(GroupId g, long timestamp, byte[] body) {
// There's only one block, so the root hash is the hash of the block
byte[] rootHash = crypto.hash(BLOCK_LABEL, FORMAT_VERSION_BYTES, body);
return getMessageIdFromRootHash(g, timestamp, rootHash);
}
private MessageId getMessageIdFromRootHash(GroupId g, long timestamp,
byte[] rootHash) {
byte[] timeBytes = new byte[INT_64_BYTES];
ByteUtils.writeUint64(timestamp, timeBytes, 0);
byte[] idHash = crypto.hash(ID_LABEL, FORMAT_VERSION_BYTES,
@@ -72,7 +65,7 @@ class MessageFactoryImpl implements MessageFactory {
long timestamp = ByteUtils.readUint64(raw, UniqueId.LENGTH);
byte[] body = new byte[raw.length - MESSAGE_HEADER_LENGTH];
System.arraycopy(raw, MESSAGE_HEADER_LENGTH, body, 0, body.length);
MessageId id = getMessageIdFromBody(g, timestamp, body);
MessageId id = getMessageId(g, timestamp, body);
return new Message(id, g, timestamp, body);
}
@@ -85,10 +78,4 @@ class MessageFactoryImpl implements MessageFactory {
System.arraycopy(body, 0, raw, MESSAGE_HEADER_LENGTH, body.length);
return raw;
}
@Override
public MessageId getMessageId(GroupId g, long timestamp,
TreeHash rootHash) {
return getMessageIdFromRootHash(g, timestamp, rootHash.getBytes());
}
}

View File

@@ -1,16 +0,0 @@
package org.briarproject.bramble.sync.tree;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.tree.LeafNode;
import org.briarproject.bramble.api.sync.tree.TreeNode;
import javax.annotation.concurrent.NotThreadSafe;
@NotThreadSafe
@NotNullByDefault
interface HashTree {
void addLeaf(LeafNode leaf);
TreeNode getRoot();
}

View File

@@ -1,46 +0,0 @@
package org.briarproject.bramble.sync.tree;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.tree.LeafNode;
import org.briarproject.bramble.api.sync.tree.TreeHasher;
import org.briarproject.bramble.api.sync.tree.TreeNode;
import java.util.Deque;
import java.util.LinkedList;
import javax.inject.Inject;
@NotNullByDefault
class HashTreeImpl implements HashTree {
private final TreeHasher treeHasher;
private final Deque<TreeNode> nodes = new LinkedList<>();
@Inject
HashTreeImpl(TreeHasher treeHasher) {
this.treeHasher = treeHasher;
}
@Override
public void addLeaf(LeafNode leaf) {
TreeNode add = leaf;
int height = leaf.getHeight();
TreeNode last = nodes.peekLast();
while (last != null && last.getHeight() == height) {
add = treeHasher.mergeTrees(last, add);
height = add.getHeight();
nodes.removeLast();
last = nodes.peekLast();
}
nodes.addLast(add);
}
@Override
public TreeNode getRoot() {
TreeNode root = nodes.removeLast();
while (!nodes.isEmpty()) {
root = treeHasher.mergeTrees(nodes.removeLast(), root);
}
return root;
}
}

View File

@@ -1,85 +0,0 @@
package org.briarproject.bramble.sync.tree;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.io.BlockSink;
import org.briarproject.bramble.api.io.HashingId;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.tree.StreamHasher;
import org.briarproject.bramble.api.sync.tree.TreeHash;
import org.briarproject.bramble.api.sync.tree.TreeHasher;
import org.briarproject.bramble.api.sync.tree.TreeNode;
import java.io.IOException;
import java.io.InputStream;
import java.util.LinkedList;
import javax.annotation.concurrent.Immutable;
import javax.inject.Inject;
import javax.inject.Provider;
import static java.util.Arrays.copyOfRange;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_BLOCK_LENGTH;
@Immutable
@NotNullByDefault
class StreamHasherImpl implements StreamHasher {
private final TreeHasher treeHasher;
private final Provider<HashTree> hashTreeProvider;
@Inject
StreamHasherImpl(TreeHasher treeHasher,
Provider<HashTree> hashTreeProvider) {
this.treeHasher = treeHasher;
this.hashTreeProvider = hashTreeProvider;
}
@Override
public TreeNode hash(InputStream in, BlockSink sink, HashingId h)
throws IOException, DbException {
HashTree tree = hashTreeProvider.get();
byte[] block = new byte[MAX_BLOCK_LENGTH];
int read;
for (int blockNumber = 0; (read = read(in, block)) > 0; blockNumber++) {
byte[] data;
if (read == block.length) data = block;
else data = copyOfRange(block, 0, read);
sink.putBlock(h, blockNumber, data);
tree.addLeaf(treeHasher.hashBlock(blockNumber, data));
}
TreeNode root = tree.getRoot();
setPaths(sink, h, root, new LinkedList<>());
return root;
}
/**
* Reads a block from the given input stream and returns the number of
* bytes read, or 0 if no bytes were read before reaching the end of the
* stream.
*/
private int read(InputStream in, byte[] block) throws IOException {
int offset = 0;
while (offset < block.length) {
int read = in.read(block, offset, block.length - offset);
if (read == -1) return offset;
offset += read;
}
return offset;
}
private void setPaths(BlockSink sink, HashingId h, TreeNode node,
LinkedList<TreeHash> path) throws DbException {
if (node.getHeight() == 0) {
// We've reached a leaf - store the path
sink.setPath(h, node.getFirstBlockNumber(), path);
} else {
// Add the right child's hash to the path and traverse the left
path.addFirst(node.getRightChild().getHash());
setPaths(sink, h, node.getLeftChild(), path);
// Add the left child's hash to the path and traverse the right
path.removeFirst();
path.addFirst(node.getLeftChild().getHash());
setPaths(sink, h, node.getRightChild(), path);
}
}
}

View File

@@ -1,44 +0,0 @@
package org.briarproject.bramble.sync.tree;
import org.briarproject.bramble.api.crypto.CryptoComponent;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.tree.LeafNode;
import org.briarproject.bramble.api.sync.tree.ParentNode;
import org.briarproject.bramble.api.sync.tree.TreeHash;
import org.briarproject.bramble.api.sync.tree.TreeHasher;
import org.briarproject.bramble.api.sync.tree.TreeNode;
import javax.annotation.concurrent.Immutable;
import javax.inject.Inject;
import static org.briarproject.bramble.api.sync.Message.FORMAT_VERSION;
import static org.briarproject.bramble.api.sync.MessageId.BLOCK_LABEL;
import static org.briarproject.bramble.api.sync.MessageId.TREE_LABEL;
@Immutable
@NotNullByDefault
class TreeHasherImpl implements TreeHasher {
private static final byte[] FORMAT_VERSION_BYTES =
new byte[] {FORMAT_VERSION};
private final CryptoComponent crypto;
@Inject
TreeHasherImpl(CryptoComponent crypto) {
this.crypto = crypto;
}
@Override
public LeafNode hashBlock(int blockNumber, byte[] data) {
byte[] hash = crypto.hash(BLOCK_LABEL, FORMAT_VERSION_BYTES, data);
return new LeafNode(new TreeHash(hash), blockNumber);
}
@Override
public ParentNode mergeTrees(TreeNode left, TreeNode right) {
byte[] hash = crypto.hash(TREE_LABEL, FORMAT_VERSION_BYTES,
left.getHash().getBytes(), right.getHash().getBytes());
return new ParentNode(new TreeHash(hash), left, right);
}
}

View File

@@ -0,0 +1,151 @@
package org.briarproject.bramble.io;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.io.BlockSource;
import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.test.BrambleMockTestCase;
import org.jmock.Expectations;
import org.jmock.lib.concurrent.Synchroniser;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Random;
import java.util.concurrent.Executor;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_BLOCK_LENGTH;
import static org.briarproject.bramble.test.TestUtils.getRandomId;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.spongycastle.util.Arrays.copyOfRange;
public class BlockSourceInputStreamTest extends BrambleMockTestCase {
private static final int MAX_DATA_BYTES = 1_000_000;
private static final int READ_BUFFER_BYTES = 4 * 1024;
private final BlockSource blockSource;
private final Random random = new Random();
private final Executor executor = newSingleThreadExecutor();
private final MessageId messageId = new MessageId(getRandomId());
public BlockSourceInputStreamTest() {
context.setThreadingPolicy(new Synchroniser());
blockSource = context.mock(BlockSource.class);
}
@Test
public void testReadSingleBytes() throws IOException {
byte[] data = createRandomData();
BlockSource source = new ByteArrayBlockSource(data, MAX_BLOCK_LENGTH);
InputStream in = new BlockSourceInputStream(MAX_BLOCK_LENGTH, executor,
source, messageId);
ByteArrayOutputStream out = new ByteArrayOutputStream();
//noinspection ForLoopReplaceableByForEach
for (int i = 0; i < data.length; i++) {
int read = in.read();
assertNotEquals(-1, read);
out.write(read);
}
assertEquals(-1, in.read());
in.close();
out.flush();
out.close();
assertArrayEquals(data, out.toByteArray());
}
@Test
public void testReadByteArrays() throws IOException {
byte[] data = createRandomData();
BlockSource source = new ByteArrayBlockSource(data, MAX_BLOCK_LENGTH);
InputStream in = new BlockSourceInputStream(MAX_BLOCK_LENGTH, executor,
source, messageId);
ByteArrayOutputStream out = new ByteArrayOutputStream();
byte[] buf = new byte[READ_BUFFER_BYTES];
int dataOffset = 0;
while (dataOffset < data.length) {
int length = Math.min(random.nextInt(buf.length) + 1,
data.length - dataOffset);
int bufOffset = 0;
if (length < buf.length)
bufOffset = random.nextInt(buf.length - length);
int read = in.read(buf, bufOffset, length);
assertNotEquals(-1, read);
out.write(buf, bufOffset, read);
dataOffset += read;
}
assertEquals(-1, in.read(buf, 0, 0));
in.close();
out.flush();
out.close();
assertArrayEquals(data, out.toByteArray());
}
@Test(expected = IOException.class)
public void testDbExceptionFromGetBlockCountIsRethrown() throws Exception {
context.checking(new Expectations() {{
oneOf(blockSource).getBlockCount(messageId);
will(throwException(new DbException()));
}});
InputStream in = new BlockSourceInputStream(MAX_BLOCK_LENGTH, executor,
blockSource, messageId);
//noinspection ResultOfMethodCallIgnored
in.read();
}
@Test(expected = IOException.class)
public void testDbExceptionFromGetBlockIsRethrown() throws Exception {
context.checking(new Expectations() {{
oneOf(blockSource).getBlockCount(messageId);
will(returnValue(1));
oneOf(blockSource).getBlock(messageId, 0);
will(throwException(new DbException()));
}});
InputStream in = new BlockSourceInputStream(MAX_BLOCK_LENGTH, executor,
blockSource, messageId);
//noinspection ResultOfMethodCallIgnored
in.read();
}
@Test
public void testReadFullBlockAtEndOfMessage() throws Exception {
testReadBlockAtEndOfMessage(MAX_BLOCK_LENGTH);
}
@Test
public void testReadPartialBlockAtEndOfMessage() throws Exception {
testReadBlockAtEndOfMessage(MAX_BLOCK_LENGTH - 1);
}
private void testReadBlockAtEndOfMessage(int blockLength) throws Exception {
byte[] block = new byte[blockLength];
random.nextBytes(block);
context.checking(new Expectations() {{
oneOf(blockSource).getBlockCount(messageId);
will(returnValue(1));
oneOf(blockSource).getBlock(messageId, 0);
will(returnValue(block));
}});
InputStream in = new BlockSourceInputStream(MAX_BLOCK_LENGTH, executor,
blockSource, messageId);
byte[] buf = new byte[MAX_BLOCK_LENGTH * 2];
assertEquals(block.length, in.read(buf, 0, buf.length));
assertArrayEquals(block, copyOfRange(buf, 0, block.length));
assertEquals(-1, in.read(buf, 0, buf.length));
}
private byte[] createRandomData() {
int length = random.nextInt(MAX_DATA_BYTES) + 1;
byte[] data = new byte[length];
random.nextBytes(data);
return data;
}
}

View File

@@ -0,0 +1,32 @@
package org.briarproject.bramble.io;
import org.briarproject.bramble.api.io.BlockSource;
import org.briarproject.bramble.api.sync.MessageId;
import static java.lang.System.arraycopy;
class ByteArrayBlockSource implements BlockSource {
private final byte[] data;
private final int blockBytes;
ByteArrayBlockSource(byte[] data, int blockBytes) {
this.data = data;
this.blockBytes = blockBytes;
}
@Override
public int getBlockCount(MessageId m) {
return (data.length + blockBytes - 1) / blockBytes;
}
@Override
public byte[] getBlock(MessageId m, int blockNumber) {
int offset = blockNumber * blockBytes;
if (offset >= data.length) throw new IllegalArgumentException();
int length = Math.min(blockBytes, data.length - offset);
byte[] block = new byte[length];
arraycopy(data, offset, block, 0, length);
return block;
}
}

View File

@@ -4,8 +4,6 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.GroupId;
import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.MessageFactory;
import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.api.sync.tree.TreeHash;
import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
@@ -29,10 +27,4 @@ public class TestMessageFactory implements MessageFactory {
System.arraycopy(body, 0, raw, MESSAGE_HEADER_LENGTH, body.length);
return raw;
}
@Override
public MessageId getMessageId(GroupId g, long timestamp,
TreeHash rootHash) {
throw new UnsupportedOperationException();
}
}

View File

@@ -9,6 +9,8 @@ import org.briarproject.briar.api.messaging.PrivateMessage;
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
import static java.util.Collections.emptyList;
@Immutable
@NotNullByDefault
public abstract class ThreadedMessage extends PrivateMessage {
@@ -19,7 +21,7 @@ public abstract class ThreadedMessage extends PrivateMessage {
public ThreadedMessage(Message message, @Nullable MessageId parent,
Author author) {
super(message);
super(message, emptyList());
this.parent = parent;
this.author = author;
}

View File

@@ -3,6 +3,8 @@ package org.briarproject.briar.api.messaging;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.Message;
import java.util.List;
import javax.annotation.concurrent.Immutable;
@Immutable
@@ -10,13 +12,18 @@ import javax.annotation.concurrent.Immutable;
public class PrivateMessage {
private final Message message;
private final List<AttachmentHeader> attachments;
public PrivateMessage(Message message) {
public PrivateMessage(Message message, List<AttachmentHeader> attachments) {
this.message = message;
this.attachments = attachments;
}
public Message getMessage() {
return message;
}
public List<AttachmentHeader> getAttachmentHeaders() {
return attachments;
}
}

View File

@@ -11,13 +11,16 @@ import org.briarproject.bramble.api.data.BdfList;
import org.briarproject.bramble.api.data.MetadataParser;
import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.db.Metadata;
import org.briarproject.bramble.api.db.Transaction;
import org.briarproject.bramble.api.io.MessageInputStreamFactory;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.Client;
import org.briarproject.bramble.api.sync.Group;
import org.briarproject.bramble.api.sync.Group.Visibility;
import org.briarproject.bramble.api.sync.GroupId;
import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.MessageFactory;
import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.api.sync.MessageStatus;
import org.briarproject.bramble.api.versioning.ClientVersioningManager;
@@ -32,16 +35,17 @@ import org.briarproject.briar.api.messaging.PrivateMessageHeader;
import org.briarproject.briar.api.messaging.event.PrivateMessageReceivedEvent;
import org.briarproject.briar.client.ConversationClientImpl;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Random;
import javax.annotation.concurrent.Immutable;
import javax.inject.Inject;
import static java.util.Collections.emptyList;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_LENGTH;
import static org.briarproject.briar.client.MessageTrackerConstants.MSG_KEY_READ;
@Immutable
@@ -51,15 +55,21 @@ class MessagingManagerImpl extends ConversationClientImpl
private final ClientVersioningManager clientVersioningManager;
private final ContactGroupFactory contactGroupFactory;
private final MessageFactory messageFactory;
private final MessageInputStreamFactory messageInputStreamFactory;
@Inject
MessagingManagerImpl(DatabaseComponent db, ClientHelper clientHelper,
ClientVersioningManager clientVersioningManager,
MetadataParser metadataParser, MessageTracker messageTracker,
ContactGroupFactory contactGroupFactory) {
ContactGroupFactory contactGroupFactory,
MessageFactory messageFactory,
MessageInputStreamFactory messageInputStreamFactory) {
super(db, clientHelper, metadataParser, messageTracker);
this.clientVersioningManager = clientVersioningManager;
this.contactGroupFactory = contactGroupFactory;
this.messageFactory = messageFactory;
this.messageInputStreamFactory = messageInputStreamFactory;
}
@Override
@@ -142,9 +152,11 @@ class MessagingManagerImpl extends ConversationClientImpl
meta.put("read", true);
clientHelper.addLocalMessage(txn, m.getMessage(), meta, true);
messageTracker.trackOutgoingMessage(txn, m.getMessage());
for (AttachmentHeader h : m.getAttachmentHeaders())
db.setMessageShared(txn, h.getMessageId());
db.commitTransaction(txn);
} catch (FormatException e) {
throw new RuntimeException(e);
throw new AssertionError(e);
} finally {
db.endTransaction(txn);
}
@@ -152,11 +164,16 @@ class MessagingManagerImpl extends ConversationClientImpl
@Override
public AttachmentHeader addLocalAttachment(GroupId groupId, long timestamp,
String contentType, ByteBuffer data) {
// TODO add real implementation
byte[] b = new byte[MessageId.LENGTH];
new Random().nextBytes(b);
return new AttachmentHeader(new MessageId(b), "image/png");
String contentType, ByteBuffer data) throws DbException {
// TODO: Remove this restriction when large messages are supported
byte[] body = data.array();
if (body.length > MAX_MESSAGE_LENGTH) throw new DbException();
// TODO: Store message type and content type
Message m = messageFactory.createMessage(groupId, timestamp, body);
Metadata meta = new Metadata();
// Attachment will be shared when private message is added
db.transaction(false, txn -> db.addLocalMessage(txn, m, meta, false));
return new AttachmentHeader(m.getId(), contentType);
}
private ContactId getContactId(Transaction txn, GroupId g)
@@ -236,8 +253,9 @@ class MessagingManagerImpl extends ConversationClientImpl
@Override
public Attachment getAttachment(MessageId m) {
// TODO add real implementation
throw new IllegalStateException("Not yet implemented");
InputStream in = messageInputStreamFactory.getMessageInputStream(m);
// TODO: Read message type and content type
return new Attachment(in);
}
}

View File

@@ -39,6 +39,6 @@ class PrivateMessageFactoryImpl implements PrivateMessageFactory {
// Serialise the message
BdfList message = BdfList.of(text);
Message m = clientHelper.createMessage(groupId, timestamp, message);
return new PrivateMessage(m);
return new PrivateMessage(m, attachments);
}
}

View File

@@ -8,6 +8,7 @@ import org.briarproject.bramble.data.DataModule;
import org.briarproject.bramble.db.DatabaseModule;
import org.briarproject.bramble.event.EventModule;
import org.briarproject.bramble.identity.IdentityModule;
import org.briarproject.bramble.io.IoModule;
import org.briarproject.bramble.lifecycle.LifecycleModule;
import org.briarproject.bramble.properties.PropertiesModule;
import org.briarproject.bramble.record.RecordModule;
@@ -50,6 +51,7 @@ import dagger.Component;
GroupInvitationModule.class,
IdentityModule.class,
IntroductionModule.class,
IoModule.class,
LifecycleModule.class,
MessagingModule.class,
PrivateGroupModule.class,

View File

@@ -8,6 +8,7 @@ import org.briarproject.bramble.data.DataModule;
import org.briarproject.bramble.db.DatabaseModule;
import org.briarproject.bramble.event.EventModule;
import org.briarproject.bramble.identity.IdentityModule;
import org.briarproject.bramble.io.IoModule;
import org.briarproject.bramble.sync.SyncModule;
import org.briarproject.bramble.sync.validation.ValidationModule;
import org.briarproject.bramble.system.SystemModule;
@@ -40,6 +41,7 @@ import dagger.Component;
EventModule.class,
ForumModule.class,
IdentityModule.class,
IoModule.class,
MessagingModule.class,
SyncModule.class,
SystemModule.class,

View File

@@ -15,6 +15,7 @@ import org.briarproject.bramble.data.DataModule;
import org.briarproject.bramble.db.DatabaseModule;
import org.briarproject.bramble.event.EventModule;
import org.briarproject.bramble.identity.IdentityModule;
import org.briarproject.bramble.io.IoModule;
import org.briarproject.bramble.lifecycle.LifecycleModule;
import org.briarproject.bramble.record.RecordModule;
import org.briarproject.bramble.sync.SyncModule;
@@ -48,6 +49,7 @@ import dagger.Component;
DatabaseModule.class,
EventModule.class,
IdentityModule.class,
IoModule.class,
LifecycleModule.class,
MessagingModule.class,
RecordModule.class,

View File

@@ -17,6 +17,7 @@ import org.briarproject.bramble.data.DataModule;
import org.briarproject.bramble.db.DatabaseModule;
import org.briarproject.bramble.event.EventModule;
import org.briarproject.bramble.identity.IdentityModule;
import org.briarproject.bramble.io.IoModule;
import org.briarproject.bramble.lifecycle.LifecycleModule;
import org.briarproject.bramble.properties.PropertiesModule;
import org.briarproject.bramble.record.RecordModule;
@@ -68,6 +69,7 @@ import dagger.Component;
GroupInvitationModule.class,
IdentityModule.class,
IntroductionModule.class,
IoModule.class,
LifecycleModule.class,
MessagingModule.class,
PrivateGroupModule.class,

View File

@@ -62,7 +62,7 @@ internal class MessagingControllerImplTest : ControllerTest() {
emptyList()
)
private val sessionId = SessionId(getRandomId())
private val privateMessage = PrivateMessage(message)
private val privateMessage = PrivateMessage(message, emptyList())
@Test
fun list() {