mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-12 10:49:06 +01:00
Compare commits
5 Commits
hash-trees
...
block-inpu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
707066f1a1 | ||
|
|
65e5dc266f | ||
|
|
3487a7cfee | ||
|
|
eea453fc5f | ||
|
|
c6f460a936 |
@@ -208,6 +208,24 @@ public interface DatabaseComponent {
|
||||
Collection<Message> generateRequestedBatch(Transaction txn, ContactId c,
|
||||
int maxLength, int maxLatency) throws DbException;
|
||||
|
||||
/**
|
||||
* Returns the number of blocks in the given message.
|
||||
* <p>
|
||||
* Read-only.
|
||||
*/
|
||||
int getBlockCount(Transaction txn, MessageId m) throws DbException;
|
||||
|
||||
/**
|
||||
* Returns the given block of the given message.
|
||||
* <p>
|
||||
* Read-only.
|
||||
*
|
||||
* @throws NoSuchBlockException if 'blockNumber' is greater than or equal
|
||||
* to the number of blocks in the message
|
||||
*/
|
||||
byte[] getBlock(Transaction txn, MessageId m, int blockNumber)
|
||||
throws DbException;
|
||||
|
||||
/**
|
||||
* Returns the contact with the given ID.
|
||||
* <p/>
|
||||
|
||||
@@ -0,0 +1,9 @@
|
||||
package org.briarproject.bramble.api.db;
|
||||
|
||||
/**
|
||||
* Thrown when a database operation is attempted for a block that is not in
|
||||
* the database. This exception may occur due to concurrent updates and does
|
||||
* not indicate a database error.
|
||||
*/
|
||||
public class NoSuchBlockException extends DbException {
|
||||
}
|
||||
@@ -1,20 +0,0 @@
|
||||
package org.briarproject.bramble.api.io;
|
||||
|
||||
import org.briarproject.bramble.api.db.DbException;
|
||||
import org.briarproject.bramble.api.sync.tree.TreeHash;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface BlockSink {
|
||||
|
||||
/**
|
||||
* Stores a block of the message with the given temporary ID.
|
||||
*/
|
||||
void putBlock(HashingId h, int blockNumber, byte[] data) throws DbException;
|
||||
|
||||
/**
|
||||
* Sets the hash tree path of a previously stored block.
|
||||
*/
|
||||
void setPath(HashingId h, int blockNumber, List<TreeHash> path)
|
||||
throws DbException;
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
package org.briarproject.bramble.api.io;
|
||||
|
||||
import org.briarproject.bramble.api.db.DbException;
|
||||
import org.briarproject.bramble.api.db.NoSuchBlockException;
|
||||
import org.briarproject.bramble.api.sync.MessageId;
|
||||
|
||||
public interface BlockSource {
|
||||
|
||||
/**
|
||||
* Returns the number of blocks in the given message.
|
||||
*/
|
||||
int getBlockCount(MessageId m) throws DbException;
|
||||
|
||||
/**
|
||||
* Returns the given block of the given message.
|
||||
*
|
||||
* @throws NoSuchBlockException if 'blockNumber' is greater than or equal
|
||||
* to the number of blocks in the message
|
||||
*/
|
||||
byte[] getBlock(MessageId m, int blockNumber) throws DbException;
|
||||
}
|
||||
@@ -1,27 +0,0 @@
|
||||
package org.briarproject.bramble.api.io;
|
||||
|
||||
import org.briarproject.bramble.api.UniqueId;
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.sync.Message;
|
||||
import org.briarproject.bramble.api.sync.MessageId;
|
||||
|
||||
import javax.annotation.concurrent.ThreadSafe;
|
||||
|
||||
/**
|
||||
* Type-safe wrapper for a byte array that uniquely identifies a
|
||||
* {@link Message} while it's being hashed and the {@link MessageId} is not
|
||||
* yet known.
|
||||
*/
|
||||
@ThreadSafe
|
||||
@NotNullByDefault
|
||||
public class HashingId extends UniqueId {
|
||||
|
||||
public HashingId(byte[] id) {
|
||||
super(id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
return o instanceof HashingId && super.equals(o);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,17 @@
|
||||
package org.briarproject.bramble.api.io;
|
||||
|
||||
import org.briarproject.bramble.api.sync.MessageId;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
public interface MessageInputStreamFactory {
|
||||
|
||||
/**
|
||||
* Returns an {@link InputStream} for reading the given message from the
|
||||
* database. This method returns immediately. If the message is not in the
|
||||
* database or cannot be read, reading from the stream will throw an
|
||||
* {@link IOException};
|
||||
*/
|
||||
InputStream getMessageInputStream(MessageId m);
|
||||
}
|
||||
@@ -1,7 +1,6 @@
|
||||
package org.briarproject.bramble.api.sync;
|
||||
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.sync.tree.TreeHash;
|
||||
|
||||
@NotNullByDefault
|
||||
public interface MessageFactory {
|
||||
@@ -11,6 +10,4 @@ public interface MessageFactory {
|
||||
Message createMessage(byte[] raw);
|
||||
|
||||
byte[] getRawMessage(Message m);
|
||||
|
||||
MessageId getMessageId(GroupId g, long timestamp, TreeHash rootHash);
|
||||
}
|
||||
|
||||
@@ -24,12 +24,6 @@ public class MessageId extends UniqueId {
|
||||
public static final String BLOCK_LABEL =
|
||||
"org.briarproject.bramble/MESSAGE_BLOCK";
|
||||
|
||||
/**
|
||||
* Label for hashing two tree hashes to produce a parent.
|
||||
*/
|
||||
public static final String TREE_LABEL =
|
||||
"org.briarproject.bramble/MESSAGE_TREE";
|
||||
|
||||
public MessageId(byte[] id) {
|
||||
super(id);
|
||||
}
|
||||
|
||||
@@ -1,21 +0,0 @@
|
||||
package org.briarproject.bramble.api.sync.tree;
|
||||
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
|
||||
@NotNullByDefault
|
||||
public class LeafNode extends TreeNode {
|
||||
|
||||
public LeafNode(TreeHash hash, int blockNumber) {
|
||||
super(hash, 0, blockNumber, blockNumber);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TreeNode getLeftChild() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TreeNode getRightChild() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
@@ -1,26 +0,0 @@
|
||||
package org.briarproject.bramble.api.sync.tree;
|
||||
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
|
||||
@NotNullByDefault
|
||||
public class ParentNode extends TreeNode {
|
||||
|
||||
private final TreeNode left, right;
|
||||
|
||||
public ParentNode(TreeHash hash, TreeNode left, TreeNode right) {
|
||||
super(hash, Math.max(left.getHeight(), right.getHeight()) + 1,
|
||||
left.getFirstBlockNumber(), right.getLastBlockNumber());
|
||||
this.left = left;
|
||||
this.right = right;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TreeNode getLeftChild() {
|
||||
return left;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TreeNode getRightChild() {
|
||||
return right;
|
||||
}
|
||||
}
|
||||
@@ -1,21 +0,0 @@
|
||||
package org.briarproject.bramble.api.sync.tree;
|
||||
|
||||
import org.briarproject.bramble.api.db.DbException;
|
||||
import org.briarproject.bramble.api.io.BlockSink;
|
||||
import org.briarproject.bramble.api.io.HashingId;
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
@NotNullByDefault
|
||||
public interface StreamHasher {
|
||||
|
||||
/**
|
||||
* Reads the given input stream, divides the data into blocks, stores
|
||||
* the blocks and the resulting hash tree using the given block sink and
|
||||
* temporary ID, and returns the hash tree.
|
||||
*/
|
||||
TreeNode hash(InputStream in, BlockSink sink, HashingId h)
|
||||
throws IOException, DbException;
|
||||
}
|
||||
@@ -1,24 +0,0 @@
|
||||
package org.briarproject.bramble.api.sync.tree;
|
||||
|
||||
import org.briarproject.bramble.api.UniqueId;
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
|
||||
import javax.annotation.concurrent.ThreadSafe;
|
||||
|
||||
/**
|
||||
* Type-safe wrapper for a byte array that uniquely identifies a sequence of
|
||||
* one or more message blocks.
|
||||
*/
|
||||
@ThreadSafe
|
||||
@NotNullByDefault
|
||||
public class TreeHash extends UniqueId {
|
||||
|
||||
public TreeHash(byte[] id) {
|
||||
super(id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
return o instanceof TreeHash && super.equals(o);
|
||||
}
|
||||
}
|
||||
@@ -1,11 +0,0 @@
|
||||
package org.briarproject.bramble.api.sync.tree;
|
||||
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
|
||||
@NotNullByDefault
|
||||
public interface TreeHasher {
|
||||
|
||||
LeafNode hashBlock(int blockNumber, byte[] data);
|
||||
|
||||
ParentNode mergeTrees(TreeNode left, TreeNode right);
|
||||
}
|
||||
@@ -1,38 +0,0 @@
|
||||
package org.briarproject.bramble.api.sync.tree;
|
||||
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
|
||||
@NotNullByDefault
|
||||
public abstract class TreeNode {
|
||||
|
||||
private final TreeHash hash;
|
||||
private final int height, firstBlockNumber, lastBlockNumber;
|
||||
|
||||
TreeNode(TreeHash hash, int height, int firstBlockNumber,
|
||||
int lastBlockNumber) {
|
||||
this.hash = hash;
|
||||
this.height = height;
|
||||
this.firstBlockNumber = firstBlockNumber;
|
||||
this.lastBlockNumber = lastBlockNumber;
|
||||
}
|
||||
|
||||
public TreeHash getHash() {
|
||||
return hash;
|
||||
}
|
||||
|
||||
public int getHeight() {
|
||||
return height;
|
||||
}
|
||||
|
||||
public int getFirstBlockNumber() {
|
||||
return firstBlockNumber;
|
||||
}
|
||||
|
||||
public int getLastBlockNumber() {
|
||||
return lastBlockNumber;
|
||||
}
|
||||
|
||||
public abstract TreeNode getLeftChild();
|
||||
|
||||
public abstract TreeNode getRightChild();
|
||||
}
|
||||
@@ -9,6 +9,7 @@ import org.briarproject.bramble.db.DatabaseExecutorModule;
|
||||
import org.briarproject.bramble.db.DatabaseModule;
|
||||
import org.briarproject.bramble.event.EventModule;
|
||||
import org.briarproject.bramble.identity.IdentityModule;
|
||||
import org.briarproject.bramble.io.IoModule;
|
||||
import org.briarproject.bramble.keyagreement.KeyAgreementModule;
|
||||
import org.briarproject.bramble.lifecycle.LifecycleModule;
|
||||
import org.briarproject.bramble.plugin.PluginModule;
|
||||
@@ -36,6 +37,7 @@ import dagger.Module;
|
||||
DatabaseExecutorModule.class,
|
||||
EventModule.class,
|
||||
IdentityModule.class,
|
||||
IoModule.class,
|
||||
KeyAgreementModule.class,
|
||||
LifecycleModule.class,
|
||||
PluginModule.class,
|
||||
|
||||
@@ -0,0 +1,29 @@
|
||||
package org.briarproject.bramble.db;
|
||||
|
||||
import org.briarproject.bramble.api.db.DatabaseComponent;
|
||||
import org.briarproject.bramble.api.db.DbException;
|
||||
import org.briarproject.bramble.api.io.BlockSource;
|
||||
import org.briarproject.bramble.api.sync.MessageId;
|
||||
|
||||
import javax.inject.Inject;
|
||||
|
||||
class BlockSourceImpl implements BlockSource {
|
||||
|
||||
private final DatabaseComponent db;
|
||||
|
||||
@Inject
|
||||
BlockSourceImpl(DatabaseComponent db) {
|
||||
this.db = db;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getBlockCount(MessageId m) throws DbException {
|
||||
return db.transactionWithResult(true, txn -> db.getBlockCount(txn, m));
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getBlock(MessageId m, int blockNumber) throws DbException {
|
||||
return db.transactionWithResult(true, txn ->
|
||||
db.getBlock(txn, m, blockNumber));
|
||||
}
|
||||
}
|
||||
@@ -14,6 +14,7 @@ import org.briarproject.bramble.api.db.DbException;
|
||||
import org.briarproject.bramble.api.db.DbRunnable;
|
||||
import org.briarproject.bramble.api.db.Metadata;
|
||||
import org.briarproject.bramble.api.db.MigrationListener;
|
||||
import org.briarproject.bramble.api.db.NoSuchBlockException;
|
||||
import org.briarproject.bramble.api.db.NoSuchContactException;
|
||||
import org.briarproject.bramble.api.db.NoSuchGroupException;
|
||||
import org.briarproject.bramble.api.db.NoSuchLocalAuthorException;
|
||||
@@ -420,6 +421,26 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
|
||||
return messages;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getBlockCount(Transaction transaction, MessageId m)
|
||||
throws DbException {
|
||||
T txn = unbox(transaction);
|
||||
if (!db.containsMessage(txn, m))
|
||||
throw new NoSuchMessageException();
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getBlock(Transaction transaction, MessageId m,
|
||||
int blockNumber) throws DbException {
|
||||
T txn = unbox(transaction);
|
||||
if (!db.containsMessage(txn, m))
|
||||
throw new NoSuchMessageException();
|
||||
if (blockNumber != 0)
|
||||
throw new NoSuchBlockException();
|
||||
return db.getMessage(txn, m).getBody();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Contact getContact(Transaction transaction, ContactId c)
|
||||
throws DbException {
|
||||
|
||||
@@ -3,6 +3,7 @@ package org.briarproject.bramble.db;
|
||||
import org.briarproject.bramble.api.db.DatabaseComponent;
|
||||
import org.briarproject.bramble.api.db.DatabaseConfig;
|
||||
import org.briarproject.bramble.api.event.EventBus;
|
||||
import org.briarproject.bramble.api.io.BlockSource;
|
||||
import org.briarproject.bramble.api.lifecycle.ShutdownManager;
|
||||
import org.briarproject.bramble.api.sync.MessageFactory;
|
||||
import org.briarproject.bramble.api.system.Clock;
|
||||
@@ -31,4 +32,9 @@ public class DatabaseModule {
|
||||
return new DatabaseComponentImpl<>(db, Connection.class, eventBus,
|
||||
shutdown);
|
||||
}
|
||||
|
||||
@Provides
|
||||
BlockSource provideBlockSource(BlockSourceImpl blockSource) {
|
||||
return blockSource;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,155 @@
|
||||
package org.briarproject.bramble.io;
|
||||
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import javax.annotation.concurrent.GuardedBy;
|
||||
import javax.annotation.concurrent.ThreadSafe;
|
||||
|
||||
import static java.lang.System.arraycopy;
|
||||
import static java.lang.Thread.currentThread;
|
||||
|
||||
/**
|
||||
* An {@link InputStream} that asynchronously fetches blocks of data on demand.
|
||||
*/
|
||||
@ThreadSafe
|
||||
@NotNullByDefault
|
||||
abstract class BlockInputStream extends InputStream {
|
||||
|
||||
private final int minBufferBytes;
|
||||
private final BlockingQueue<Buffer> queue = new ArrayBlockingQueue<>(1);
|
||||
private final Object lock = new Object();
|
||||
|
||||
@GuardedBy("lock")
|
||||
@Nullable
|
||||
private Buffer buffer = null;
|
||||
|
||||
@GuardedBy("lock")
|
||||
private int offset = 0;
|
||||
|
||||
@GuardedBy("lock")
|
||||
private boolean fetchingBlock = false;
|
||||
|
||||
abstract void fetchBlockAsync(int blockNumber);
|
||||
|
||||
BlockInputStream(int minBufferBytes) {
|
||||
this.minBufferBytes = minBufferBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
synchronized (lock) {
|
||||
if (!prepareRead()) return -1;
|
||||
if (buffer == null) throw new AssertionError();
|
||||
return buffer.data[offset++] & 0xFF;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(byte[] b) throws IOException {
|
||||
return read(b, 0, b.length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(byte[] b, int off, int len) throws IOException {
|
||||
if (off < 0 || len < 0 || off + len > b.length)
|
||||
throw new IllegalArgumentException();
|
||||
synchronized (lock) {
|
||||
if (!prepareRead()) return -1;
|
||||
if (buffer == null) throw new AssertionError();
|
||||
len = Math.min(len, buffer.length - offset);
|
||||
if (len < 0) throw new AssertionError();
|
||||
arraycopy(buffer.data, offset, b, off, len);
|
||||
offset += len;
|
||||
return len;
|
||||
}
|
||||
}
|
||||
|
||||
private boolean prepareRead() throws IOException {
|
||||
throwExceptionIfNecessary();
|
||||
if (isEndOfStream()) return false;
|
||||
if (shouldFetchBlock()) fetchBlockAsync();
|
||||
waitForBlock();
|
||||
if (buffer == null) throw new AssertionError();
|
||||
return offset < buffer.length;
|
||||
}
|
||||
|
||||
@GuardedBy("lock")
|
||||
private void throwExceptionIfNecessary() throws IOException {
|
||||
if (buffer != null && buffer.exception != null)
|
||||
throw new IOException(buffer.exception);
|
||||
}
|
||||
|
||||
@GuardedBy("lock")
|
||||
private boolean isEndOfStream() {
|
||||
return buffer != null && offset == buffer.length && !fetchingBlock;
|
||||
}
|
||||
|
||||
@GuardedBy("lock")
|
||||
private boolean shouldFetchBlock() {
|
||||
if (fetchingBlock) return false;
|
||||
if (buffer == null) return true;
|
||||
if (buffer.length == 0) return false;
|
||||
return buffer.length - offset < minBufferBytes;
|
||||
}
|
||||
|
||||
@GuardedBy("lock")
|
||||
private void fetchBlockAsync() {
|
||||
if (buffer == null) fetchBlockAsync(0);
|
||||
else fetchBlockAsync(buffer.blockNumber + 1);
|
||||
fetchingBlock = true;
|
||||
}
|
||||
|
||||
@GuardedBy("lock")
|
||||
private void waitForBlock() throws IOException {
|
||||
if (buffer != null && offset < buffer.length) return;
|
||||
try {
|
||||
buffer = queue.take();
|
||||
} catch (InterruptedException e) {
|
||||
currentThread().interrupt();
|
||||
throw new InterruptedIOException();
|
||||
}
|
||||
fetchingBlock = false;
|
||||
offset = 0;
|
||||
throwExceptionIfNecessary();
|
||||
}
|
||||
|
||||
void fetchSucceeded(int blockNumber, byte[] data, int length) {
|
||||
queue.add(new Buffer(blockNumber, data, length));
|
||||
}
|
||||
|
||||
void fetchFailed(int blockNumber, Exception exception) {
|
||||
queue.add(new Buffer(blockNumber, exception));
|
||||
}
|
||||
|
||||
private static class Buffer {
|
||||
|
||||
private final int blockNumber;
|
||||
private final byte[] data;
|
||||
private final int length;
|
||||
@Nullable
|
||||
private final Exception exception;
|
||||
|
||||
private Buffer(int blockNumber, byte[] data, int length) {
|
||||
if (length < 0 || length > data.length)
|
||||
throw new IllegalArgumentException();
|
||||
this.blockNumber = blockNumber;
|
||||
this.data = data;
|
||||
this.length = length;
|
||||
exception = null;
|
||||
}
|
||||
|
||||
private Buffer(int blockNumber, Exception exception) {
|
||||
this.blockNumber = blockNumber;
|
||||
this.exception = exception;
|
||||
data = new byte[0];
|
||||
length = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
package org.briarproject.bramble.io;
|
||||
|
||||
import org.briarproject.bramble.api.db.DbException;
|
||||
import org.briarproject.bramble.api.io.BlockSource;
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.sync.MessageId;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import javax.annotation.concurrent.ThreadSafe;
|
||||
|
||||
/**
|
||||
* A {@link BlockInputStream} that fetches data from a {@link BlockSource}.
|
||||
*/
|
||||
@ThreadSafe
|
||||
@NotNullByDefault
|
||||
class BlockSourceInputStream extends BlockInputStream {
|
||||
|
||||
private final Executor executor;
|
||||
private final BlockSource blockSource;
|
||||
private final MessageId messageId;
|
||||
|
||||
private volatile int blockCount = -1;
|
||||
|
||||
BlockSourceInputStream(int minBufferBytes, Executor executor,
|
||||
BlockSource blockSource, MessageId messageId) {
|
||||
super(minBufferBytes);
|
||||
this.executor = executor;
|
||||
this.blockSource = blockSource;
|
||||
this.messageId = messageId;
|
||||
}
|
||||
|
||||
@Override
|
||||
void fetchBlockAsync(int blockNumber) {
|
||||
executor.execute(() -> {
|
||||
try {
|
||||
if (blockCount == -1) {
|
||||
blockCount = blockSource.getBlockCount(messageId);
|
||||
}
|
||||
if (blockNumber > blockCount) {
|
||||
fetchFailed(blockNumber, new IllegalArgumentException());
|
||||
} else if (blockNumber == blockCount) {
|
||||
fetchSucceeded(blockNumber, new byte[0], 0); // EOF
|
||||
} else {
|
||||
byte[] block = blockSource.getBlock(messageId, blockNumber);
|
||||
fetchSucceeded(blockNumber, block, block.length);
|
||||
}
|
||||
} catch (DbException e) {
|
||||
fetchFailed(blockNumber, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,16 @@
|
||||
package org.briarproject.bramble.io;
|
||||
|
||||
import org.briarproject.bramble.api.io.MessageInputStreamFactory;
|
||||
|
||||
import dagger.Module;
|
||||
import dagger.Provides;
|
||||
|
||||
@Module
|
||||
public class IoModule {
|
||||
|
||||
@Provides
|
||||
MessageInputStreamFactory provideMessageInputStreamFactory(
|
||||
MessageInputStreamFactoryImpl messageInputStreamFactory) {
|
||||
return messageInputStreamFactory;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
package org.briarproject.bramble.io;
|
||||
|
||||
import org.briarproject.bramble.api.db.DatabaseExecutor;
|
||||
import org.briarproject.bramble.api.io.BlockSource;
|
||||
import org.briarproject.bramble.api.io.MessageInputStreamFactory;
|
||||
import org.briarproject.bramble.api.sync.MessageId;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import javax.inject.Inject;
|
||||
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_BLOCK_LENGTH;
|
||||
|
||||
class MessageInputStreamFactoryImpl implements MessageInputStreamFactory {
|
||||
|
||||
private final Executor dbExecutor;
|
||||
private final BlockSource blockSource;
|
||||
|
||||
@Inject
|
||||
MessageInputStreamFactoryImpl(@DatabaseExecutor Executor dbExecutor,
|
||||
BlockSource blockSource) {
|
||||
this.dbExecutor = dbExecutor;
|
||||
this.blockSource = blockSource;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream getMessageInputStream(MessageId m) {
|
||||
return new BlockSourceInputStream(MAX_BLOCK_LENGTH, dbExecutor,
|
||||
blockSource, m);
|
||||
}
|
||||
}
|
||||
@@ -7,7 +7,6 @@ import org.briarproject.bramble.api.sync.GroupId;
|
||||
import org.briarproject.bramble.api.sync.Message;
|
||||
import org.briarproject.bramble.api.sync.MessageFactory;
|
||||
import org.briarproject.bramble.api.sync.MessageId;
|
||||
import org.briarproject.bramble.api.sync.tree.TreeHash;
|
||||
import org.briarproject.bramble.util.ByteUtils;
|
||||
|
||||
import javax.annotation.concurrent.Immutable;
|
||||
@@ -40,19 +39,13 @@ class MessageFactoryImpl implements MessageFactory {
|
||||
if (body.length == 0) throw new IllegalArgumentException();
|
||||
if (body.length > MAX_MESSAGE_BODY_LENGTH)
|
||||
throw new IllegalArgumentException();
|
||||
MessageId id = getMessageIdFromBody(g, timestamp, body);
|
||||
MessageId id = getMessageId(g, timestamp, body);
|
||||
return new Message(id, g, timestamp, body);
|
||||
}
|
||||
|
||||
private MessageId getMessageIdFromBody(GroupId g, long timestamp,
|
||||
byte[] body) {
|
||||
private MessageId getMessageId(GroupId g, long timestamp, byte[] body) {
|
||||
// There's only one block, so the root hash is the hash of the block
|
||||
byte[] rootHash = crypto.hash(BLOCK_LABEL, FORMAT_VERSION_BYTES, body);
|
||||
return getMessageIdFromRootHash(g, timestamp, rootHash);
|
||||
}
|
||||
|
||||
private MessageId getMessageIdFromRootHash(GroupId g, long timestamp,
|
||||
byte[] rootHash) {
|
||||
byte[] timeBytes = new byte[INT_64_BYTES];
|
||||
ByteUtils.writeUint64(timestamp, timeBytes, 0);
|
||||
byte[] idHash = crypto.hash(ID_LABEL, FORMAT_VERSION_BYTES,
|
||||
@@ -72,7 +65,7 @@ class MessageFactoryImpl implements MessageFactory {
|
||||
long timestamp = ByteUtils.readUint64(raw, UniqueId.LENGTH);
|
||||
byte[] body = new byte[raw.length - MESSAGE_HEADER_LENGTH];
|
||||
System.arraycopy(raw, MESSAGE_HEADER_LENGTH, body, 0, body.length);
|
||||
MessageId id = getMessageIdFromBody(g, timestamp, body);
|
||||
MessageId id = getMessageId(g, timestamp, body);
|
||||
return new Message(id, g, timestamp, body);
|
||||
}
|
||||
|
||||
@@ -85,10 +78,4 @@ class MessageFactoryImpl implements MessageFactory {
|
||||
System.arraycopy(body, 0, raw, MESSAGE_HEADER_LENGTH, body.length);
|
||||
return raw;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageId getMessageId(GroupId g, long timestamp,
|
||||
TreeHash rootHash) {
|
||||
return getMessageIdFromRootHash(g, timestamp, rootHash.getBytes());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,16 +0,0 @@
|
||||
package org.briarproject.bramble.sync.tree;
|
||||
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.sync.tree.LeafNode;
|
||||
import org.briarproject.bramble.api.sync.tree.TreeNode;
|
||||
|
||||
import javax.annotation.concurrent.NotThreadSafe;
|
||||
|
||||
@NotThreadSafe
|
||||
@NotNullByDefault
|
||||
interface HashTree {
|
||||
|
||||
void addLeaf(LeafNode leaf);
|
||||
|
||||
TreeNode getRoot();
|
||||
}
|
||||
@@ -1,46 +0,0 @@
|
||||
package org.briarproject.bramble.sync.tree;
|
||||
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.sync.tree.LeafNode;
|
||||
import org.briarproject.bramble.api.sync.tree.TreeHasher;
|
||||
import org.briarproject.bramble.api.sync.tree.TreeNode;
|
||||
|
||||
import java.util.Deque;
|
||||
import java.util.LinkedList;
|
||||
|
||||
import javax.inject.Inject;
|
||||
|
||||
@NotNullByDefault
|
||||
class HashTreeImpl implements HashTree {
|
||||
|
||||
private final TreeHasher treeHasher;
|
||||
private final Deque<TreeNode> nodes = new LinkedList<>();
|
||||
|
||||
@Inject
|
||||
HashTreeImpl(TreeHasher treeHasher) {
|
||||
this.treeHasher = treeHasher;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addLeaf(LeafNode leaf) {
|
||||
TreeNode add = leaf;
|
||||
int height = leaf.getHeight();
|
||||
TreeNode last = nodes.peekLast();
|
||||
while (last != null && last.getHeight() == height) {
|
||||
add = treeHasher.mergeTrees(last, add);
|
||||
height = add.getHeight();
|
||||
nodes.removeLast();
|
||||
last = nodes.peekLast();
|
||||
}
|
||||
nodes.addLast(add);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TreeNode getRoot() {
|
||||
TreeNode root = nodes.removeLast();
|
||||
while (!nodes.isEmpty()) {
|
||||
root = treeHasher.mergeTrees(nodes.removeLast(), root);
|
||||
}
|
||||
return root;
|
||||
}
|
||||
}
|
||||
@@ -1,85 +0,0 @@
|
||||
package org.briarproject.bramble.sync.tree;
|
||||
|
||||
import org.briarproject.bramble.api.db.DbException;
|
||||
import org.briarproject.bramble.api.io.BlockSink;
|
||||
import org.briarproject.bramble.api.io.HashingId;
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.sync.tree.StreamHasher;
|
||||
import org.briarproject.bramble.api.sync.tree.TreeHash;
|
||||
import org.briarproject.bramble.api.sync.tree.TreeHasher;
|
||||
import org.briarproject.bramble.api.sync.tree.TreeNode;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.LinkedList;
|
||||
|
||||
import javax.annotation.concurrent.Immutable;
|
||||
import javax.inject.Inject;
|
||||
import javax.inject.Provider;
|
||||
|
||||
import static java.util.Arrays.copyOfRange;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_BLOCK_LENGTH;
|
||||
|
||||
@Immutable
|
||||
@NotNullByDefault
|
||||
class StreamHasherImpl implements StreamHasher {
|
||||
|
||||
private final TreeHasher treeHasher;
|
||||
private final Provider<HashTree> hashTreeProvider;
|
||||
|
||||
@Inject
|
||||
StreamHasherImpl(TreeHasher treeHasher,
|
||||
Provider<HashTree> hashTreeProvider) {
|
||||
this.treeHasher = treeHasher;
|
||||
this.hashTreeProvider = hashTreeProvider;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TreeNode hash(InputStream in, BlockSink sink, HashingId h)
|
||||
throws IOException, DbException {
|
||||
HashTree tree = hashTreeProvider.get();
|
||||
byte[] block = new byte[MAX_BLOCK_LENGTH];
|
||||
int read;
|
||||
for (int blockNumber = 0; (read = read(in, block)) > 0; blockNumber++) {
|
||||
byte[] data;
|
||||
if (read == block.length) data = block;
|
||||
else data = copyOfRange(block, 0, read);
|
||||
sink.putBlock(h, blockNumber, data);
|
||||
tree.addLeaf(treeHasher.hashBlock(blockNumber, data));
|
||||
}
|
||||
TreeNode root = tree.getRoot();
|
||||
setPaths(sink, h, root, new LinkedList<>());
|
||||
return root;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads a block from the given input stream and returns the number of
|
||||
* bytes read, or 0 if no bytes were read before reaching the end of the
|
||||
* stream.
|
||||
*/
|
||||
private int read(InputStream in, byte[] block) throws IOException {
|
||||
int offset = 0;
|
||||
while (offset < block.length) {
|
||||
int read = in.read(block, offset, block.length - offset);
|
||||
if (read == -1) return offset;
|
||||
offset += read;
|
||||
}
|
||||
return offset;
|
||||
}
|
||||
|
||||
private void setPaths(BlockSink sink, HashingId h, TreeNode node,
|
||||
LinkedList<TreeHash> path) throws DbException {
|
||||
if (node.getHeight() == 0) {
|
||||
// We've reached a leaf - store the path
|
||||
sink.setPath(h, node.getFirstBlockNumber(), path);
|
||||
} else {
|
||||
// Add the right child's hash to the path and traverse the left
|
||||
path.addFirst(node.getRightChild().getHash());
|
||||
setPaths(sink, h, node.getLeftChild(), path);
|
||||
// Add the left child's hash to the path and traverse the right
|
||||
path.removeFirst();
|
||||
path.addFirst(node.getLeftChild().getHash());
|
||||
setPaths(sink, h, node.getRightChild(), path);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,44 +0,0 @@
|
||||
package org.briarproject.bramble.sync.tree;
|
||||
|
||||
import org.briarproject.bramble.api.crypto.CryptoComponent;
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.sync.tree.LeafNode;
|
||||
import org.briarproject.bramble.api.sync.tree.ParentNode;
|
||||
import org.briarproject.bramble.api.sync.tree.TreeHash;
|
||||
import org.briarproject.bramble.api.sync.tree.TreeHasher;
|
||||
import org.briarproject.bramble.api.sync.tree.TreeNode;
|
||||
|
||||
import javax.annotation.concurrent.Immutable;
|
||||
import javax.inject.Inject;
|
||||
|
||||
import static org.briarproject.bramble.api.sync.Message.FORMAT_VERSION;
|
||||
import static org.briarproject.bramble.api.sync.MessageId.BLOCK_LABEL;
|
||||
import static org.briarproject.bramble.api.sync.MessageId.TREE_LABEL;
|
||||
|
||||
@Immutable
|
||||
@NotNullByDefault
|
||||
class TreeHasherImpl implements TreeHasher {
|
||||
|
||||
private static final byte[] FORMAT_VERSION_BYTES =
|
||||
new byte[] {FORMAT_VERSION};
|
||||
|
||||
private final CryptoComponent crypto;
|
||||
|
||||
@Inject
|
||||
TreeHasherImpl(CryptoComponent crypto) {
|
||||
this.crypto = crypto;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LeafNode hashBlock(int blockNumber, byte[] data) {
|
||||
byte[] hash = crypto.hash(BLOCK_LABEL, FORMAT_VERSION_BYTES, data);
|
||||
return new LeafNode(new TreeHash(hash), blockNumber);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ParentNode mergeTrees(TreeNode left, TreeNode right) {
|
||||
byte[] hash = crypto.hash(TREE_LABEL, FORMAT_VERSION_BYTES,
|
||||
left.getHash().getBytes(), right.getHash().getBytes());
|
||||
return new ParentNode(new TreeHash(hash), left, right);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,151 @@
|
||||
package org.briarproject.bramble.io;
|
||||
|
||||
import org.briarproject.bramble.api.db.DbException;
|
||||
import org.briarproject.bramble.api.io.BlockSource;
|
||||
import org.briarproject.bramble.api.sync.MessageId;
|
||||
import org.briarproject.bramble.test.BrambleMockTestCase;
|
||||
import org.jmock.Expectations;
|
||||
import org.jmock.lib.concurrent.Synchroniser;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import static java.util.concurrent.Executors.newSingleThreadExecutor;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_BLOCK_LENGTH;
|
||||
import static org.briarproject.bramble.test.TestUtils.getRandomId;
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.spongycastle.util.Arrays.copyOfRange;
|
||||
|
||||
public class BlockSourceInputStreamTest extends BrambleMockTestCase {
|
||||
|
||||
private static final int MAX_DATA_BYTES = 1_000_000;
|
||||
private static final int READ_BUFFER_BYTES = 4 * 1024;
|
||||
|
||||
private final BlockSource blockSource;
|
||||
|
||||
private final Random random = new Random();
|
||||
private final Executor executor = newSingleThreadExecutor();
|
||||
private final MessageId messageId = new MessageId(getRandomId());
|
||||
|
||||
public BlockSourceInputStreamTest() {
|
||||
context.setThreadingPolicy(new Synchroniser());
|
||||
blockSource = context.mock(BlockSource.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadSingleBytes() throws IOException {
|
||||
byte[] data = createRandomData();
|
||||
BlockSource source = new ByteArrayBlockSource(data, MAX_BLOCK_LENGTH);
|
||||
InputStream in = new BlockSourceInputStream(MAX_BLOCK_LENGTH, executor,
|
||||
source, messageId);
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
//noinspection ForLoopReplaceableByForEach
|
||||
for (int i = 0; i < data.length; i++) {
|
||||
int read = in.read();
|
||||
assertNotEquals(-1, read);
|
||||
out.write(read);
|
||||
}
|
||||
assertEquals(-1, in.read());
|
||||
in.close();
|
||||
out.flush();
|
||||
out.close();
|
||||
assertArrayEquals(data, out.toByteArray());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadByteArrays() throws IOException {
|
||||
byte[] data = createRandomData();
|
||||
BlockSource source = new ByteArrayBlockSource(data, MAX_BLOCK_LENGTH);
|
||||
InputStream in = new BlockSourceInputStream(MAX_BLOCK_LENGTH, executor,
|
||||
source, messageId);
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
byte[] buf = new byte[READ_BUFFER_BYTES];
|
||||
int dataOffset = 0;
|
||||
while (dataOffset < data.length) {
|
||||
int length = Math.min(random.nextInt(buf.length) + 1,
|
||||
data.length - dataOffset);
|
||||
int bufOffset = 0;
|
||||
if (length < buf.length)
|
||||
bufOffset = random.nextInt(buf.length - length);
|
||||
int read = in.read(buf, bufOffset, length);
|
||||
assertNotEquals(-1, read);
|
||||
out.write(buf, bufOffset, read);
|
||||
dataOffset += read;
|
||||
}
|
||||
assertEquals(-1, in.read(buf, 0, 0));
|
||||
in.close();
|
||||
out.flush();
|
||||
out.close();
|
||||
assertArrayEquals(data, out.toByteArray());
|
||||
}
|
||||
|
||||
@Test(expected = IOException.class)
|
||||
public void testDbExceptionFromGetBlockCountIsRethrown() throws Exception {
|
||||
context.checking(new Expectations() {{
|
||||
oneOf(blockSource).getBlockCount(messageId);
|
||||
will(throwException(new DbException()));
|
||||
}});
|
||||
|
||||
InputStream in = new BlockSourceInputStream(MAX_BLOCK_LENGTH, executor,
|
||||
blockSource, messageId);
|
||||
//noinspection ResultOfMethodCallIgnored
|
||||
in.read();
|
||||
}
|
||||
|
||||
@Test(expected = IOException.class)
|
||||
public void testDbExceptionFromGetBlockIsRethrown() throws Exception {
|
||||
context.checking(new Expectations() {{
|
||||
oneOf(blockSource).getBlockCount(messageId);
|
||||
will(returnValue(1));
|
||||
oneOf(blockSource).getBlock(messageId, 0);
|
||||
will(throwException(new DbException()));
|
||||
}});
|
||||
|
||||
InputStream in = new BlockSourceInputStream(MAX_BLOCK_LENGTH, executor,
|
||||
blockSource, messageId);
|
||||
//noinspection ResultOfMethodCallIgnored
|
||||
in.read();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadFullBlockAtEndOfMessage() throws Exception {
|
||||
testReadBlockAtEndOfMessage(MAX_BLOCK_LENGTH);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadPartialBlockAtEndOfMessage() throws Exception {
|
||||
testReadBlockAtEndOfMessage(MAX_BLOCK_LENGTH - 1);
|
||||
}
|
||||
|
||||
private void testReadBlockAtEndOfMessage(int blockLength) throws Exception {
|
||||
byte[] block = new byte[blockLength];
|
||||
random.nextBytes(block);
|
||||
|
||||
context.checking(new Expectations() {{
|
||||
oneOf(blockSource).getBlockCount(messageId);
|
||||
will(returnValue(1));
|
||||
oneOf(blockSource).getBlock(messageId, 0);
|
||||
will(returnValue(block));
|
||||
}});
|
||||
|
||||
InputStream in = new BlockSourceInputStream(MAX_BLOCK_LENGTH, executor,
|
||||
blockSource, messageId);
|
||||
byte[] buf = new byte[MAX_BLOCK_LENGTH * 2];
|
||||
assertEquals(block.length, in.read(buf, 0, buf.length));
|
||||
assertArrayEquals(block, copyOfRange(buf, 0, block.length));
|
||||
assertEquals(-1, in.read(buf, 0, buf.length));
|
||||
}
|
||||
|
||||
private byte[] createRandomData() {
|
||||
int length = random.nextInt(MAX_DATA_BYTES) + 1;
|
||||
byte[] data = new byte[length];
|
||||
random.nextBytes(data);
|
||||
return data;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
package org.briarproject.bramble.io;
|
||||
|
||||
import org.briarproject.bramble.api.io.BlockSource;
|
||||
import org.briarproject.bramble.api.sync.MessageId;
|
||||
|
||||
import static java.lang.System.arraycopy;
|
||||
|
||||
class ByteArrayBlockSource implements BlockSource {
|
||||
|
||||
private final byte[] data;
|
||||
private final int blockBytes;
|
||||
|
||||
ByteArrayBlockSource(byte[] data, int blockBytes) {
|
||||
this.data = data;
|
||||
this.blockBytes = blockBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getBlockCount(MessageId m) {
|
||||
return (data.length + blockBytes - 1) / blockBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getBlock(MessageId m, int blockNumber) {
|
||||
int offset = blockNumber * blockBytes;
|
||||
if (offset >= data.length) throw new IllegalArgumentException();
|
||||
int length = Math.min(blockBytes, data.length - offset);
|
||||
byte[] block = new byte[length];
|
||||
arraycopy(data, offset, block, 0, length);
|
||||
return block;
|
||||
}
|
||||
}
|
||||
@@ -4,8 +4,6 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.sync.GroupId;
|
||||
import org.briarproject.bramble.api.sync.Message;
|
||||
import org.briarproject.bramble.api.sync.MessageFactory;
|
||||
import org.briarproject.bramble.api.sync.MessageId;
|
||||
import org.briarproject.bramble.api.sync.tree.TreeHash;
|
||||
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
|
||||
|
||||
@@ -29,10 +27,4 @@ public class TestMessageFactory implements MessageFactory {
|
||||
System.arraycopy(body, 0, raw, MESSAGE_HEADER_LENGTH, body.length);
|
||||
return raw;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageId getMessageId(GroupId g, long timestamp,
|
||||
TreeHash rootHash) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,6 +9,8 @@ import org.briarproject.briar.api.messaging.PrivateMessage;
|
||||
import javax.annotation.Nullable;
|
||||
import javax.annotation.concurrent.Immutable;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
|
||||
@Immutable
|
||||
@NotNullByDefault
|
||||
public abstract class ThreadedMessage extends PrivateMessage {
|
||||
@@ -19,7 +21,7 @@ public abstract class ThreadedMessage extends PrivateMessage {
|
||||
|
||||
public ThreadedMessage(Message message, @Nullable MessageId parent,
|
||||
Author author) {
|
||||
super(message);
|
||||
super(message, emptyList());
|
||||
this.parent = parent;
|
||||
this.author = author;
|
||||
}
|
||||
|
||||
@@ -3,6 +3,8 @@ package org.briarproject.briar.api.messaging;
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.sync.Message;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import javax.annotation.concurrent.Immutable;
|
||||
|
||||
@Immutable
|
||||
@@ -10,13 +12,18 @@ import javax.annotation.concurrent.Immutable;
|
||||
public class PrivateMessage {
|
||||
|
||||
private final Message message;
|
||||
private final List<AttachmentHeader> attachments;
|
||||
|
||||
public PrivateMessage(Message message) {
|
||||
public PrivateMessage(Message message, List<AttachmentHeader> attachments) {
|
||||
this.message = message;
|
||||
this.attachments = attachments;
|
||||
}
|
||||
|
||||
public Message getMessage() {
|
||||
return message;
|
||||
}
|
||||
|
||||
public List<AttachmentHeader> getAttachmentHeaders() {
|
||||
return attachments;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,13 +11,16 @@ import org.briarproject.bramble.api.data.BdfList;
|
||||
import org.briarproject.bramble.api.data.MetadataParser;
|
||||
import org.briarproject.bramble.api.db.DatabaseComponent;
|
||||
import org.briarproject.bramble.api.db.DbException;
|
||||
import org.briarproject.bramble.api.db.Metadata;
|
||||
import org.briarproject.bramble.api.db.Transaction;
|
||||
import org.briarproject.bramble.api.io.MessageInputStreamFactory;
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
import org.briarproject.bramble.api.sync.Client;
|
||||
import org.briarproject.bramble.api.sync.Group;
|
||||
import org.briarproject.bramble.api.sync.Group.Visibility;
|
||||
import org.briarproject.bramble.api.sync.GroupId;
|
||||
import org.briarproject.bramble.api.sync.Message;
|
||||
import org.briarproject.bramble.api.sync.MessageFactory;
|
||||
import org.briarproject.bramble.api.sync.MessageId;
|
||||
import org.briarproject.bramble.api.sync.MessageStatus;
|
||||
import org.briarproject.bramble.api.versioning.ClientVersioningManager;
|
||||
@@ -32,16 +35,17 @@ import org.briarproject.briar.api.messaging.PrivateMessageHeader;
|
||||
import org.briarproject.briar.api.messaging.event.PrivateMessageReceivedEvent;
|
||||
import org.briarproject.briar.client.ConversationClientImpl;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
||||
import javax.annotation.concurrent.Immutable;
|
||||
import javax.inject.Inject;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_LENGTH;
|
||||
import static org.briarproject.briar.client.MessageTrackerConstants.MSG_KEY_READ;
|
||||
|
||||
@Immutable
|
||||
@@ -51,15 +55,21 @@ class MessagingManagerImpl extends ConversationClientImpl
|
||||
|
||||
private final ClientVersioningManager clientVersioningManager;
|
||||
private final ContactGroupFactory contactGroupFactory;
|
||||
private final MessageFactory messageFactory;
|
||||
private final MessageInputStreamFactory messageInputStreamFactory;
|
||||
|
||||
@Inject
|
||||
MessagingManagerImpl(DatabaseComponent db, ClientHelper clientHelper,
|
||||
ClientVersioningManager clientVersioningManager,
|
||||
MetadataParser metadataParser, MessageTracker messageTracker,
|
||||
ContactGroupFactory contactGroupFactory) {
|
||||
ContactGroupFactory contactGroupFactory,
|
||||
MessageFactory messageFactory,
|
||||
MessageInputStreamFactory messageInputStreamFactory) {
|
||||
super(db, clientHelper, metadataParser, messageTracker);
|
||||
this.clientVersioningManager = clientVersioningManager;
|
||||
this.contactGroupFactory = contactGroupFactory;
|
||||
this.messageFactory = messageFactory;
|
||||
this.messageInputStreamFactory = messageInputStreamFactory;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -142,9 +152,11 @@ class MessagingManagerImpl extends ConversationClientImpl
|
||||
meta.put("read", true);
|
||||
clientHelper.addLocalMessage(txn, m.getMessage(), meta, true);
|
||||
messageTracker.trackOutgoingMessage(txn, m.getMessage());
|
||||
for (AttachmentHeader h : m.getAttachmentHeaders())
|
||||
db.setMessageShared(txn, h.getMessageId());
|
||||
db.commitTransaction(txn);
|
||||
} catch (FormatException e) {
|
||||
throw new RuntimeException(e);
|
||||
throw new AssertionError(e);
|
||||
} finally {
|
||||
db.endTransaction(txn);
|
||||
}
|
||||
@@ -152,11 +164,16 @@ class MessagingManagerImpl extends ConversationClientImpl
|
||||
|
||||
@Override
|
||||
public AttachmentHeader addLocalAttachment(GroupId groupId, long timestamp,
|
||||
String contentType, ByteBuffer data) {
|
||||
// TODO add real implementation
|
||||
byte[] b = new byte[MessageId.LENGTH];
|
||||
new Random().nextBytes(b);
|
||||
return new AttachmentHeader(new MessageId(b), "image/png");
|
||||
String contentType, ByteBuffer data) throws DbException {
|
||||
// TODO: Remove this restriction when large messages are supported
|
||||
byte[] body = data.array();
|
||||
if (body.length > MAX_MESSAGE_LENGTH) throw new DbException();
|
||||
// TODO: Store message type and content type
|
||||
Message m = messageFactory.createMessage(groupId, timestamp, body);
|
||||
Metadata meta = new Metadata();
|
||||
// Attachment will be shared when private message is added
|
||||
db.transaction(false, txn -> db.addLocalMessage(txn, m, meta, false));
|
||||
return new AttachmentHeader(m.getId(), contentType);
|
||||
}
|
||||
|
||||
private ContactId getContactId(Transaction txn, GroupId g)
|
||||
@@ -236,8 +253,9 @@ class MessagingManagerImpl extends ConversationClientImpl
|
||||
|
||||
@Override
|
||||
public Attachment getAttachment(MessageId m) {
|
||||
// TODO add real implementation
|
||||
throw new IllegalStateException("Not yet implemented");
|
||||
InputStream in = messageInputStreamFactory.getMessageInputStream(m);
|
||||
// TODO: Read message type and content type
|
||||
return new Attachment(in);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -39,6 +39,6 @@ class PrivateMessageFactoryImpl implements PrivateMessageFactory {
|
||||
// Serialise the message
|
||||
BdfList message = BdfList.of(text);
|
||||
Message m = clientHelper.createMessage(groupId, timestamp, message);
|
||||
return new PrivateMessage(m);
|
||||
return new PrivateMessage(m, attachments);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import org.briarproject.bramble.data.DataModule;
|
||||
import org.briarproject.bramble.db.DatabaseModule;
|
||||
import org.briarproject.bramble.event.EventModule;
|
||||
import org.briarproject.bramble.identity.IdentityModule;
|
||||
import org.briarproject.bramble.io.IoModule;
|
||||
import org.briarproject.bramble.lifecycle.LifecycleModule;
|
||||
import org.briarproject.bramble.properties.PropertiesModule;
|
||||
import org.briarproject.bramble.record.RecordModule;
|
||||
@@ -50,6 +51,7 @@ import dagger.Component;
|
||||
GroupInvitationModule.class,
|
||||
IdentityModule.class,
|
||||
IntroductionModule.class,
|
||||
IoModule.class,
|
||||
LifecycleModule.class,
|
||||
MessagingModule.class,
|
||||
PrivateGroupModule.class,
|
||||
|
||||
@@ -8,6 +8,7 @@ import org.briarproject.bramble.data.DataModule;
|
||||
import org.briarproject.bramble.db.DatabaseModule;
|
||||
import org.briarproject.bramble.event.EventModule;
|
||||
import org.briarproject.bramble.identity.IdentityModule;
|
||||
import org.briarproject.bramble.io.IoModule;
|
||||
import org.briarproject.bramble.sync.SyncModule;
|
||||
import org.briarproject.bramble.sync.validation.ValidationModule;
|
||||
import org.briarproject.bramble.system.SystemModule;
|
||||
@@ -40,6 +41,7 @@ import dagger.Component;
|
||||
EventModule.class,
|
||||
ForumModule.class,
|
||||
IdentityModule.class,
|
||||
IoModule.class,
|
||||
MessagingModule.class,
|
||||
SyncModule.class,
|
||||
SystemModule.class,
|
||||
|
||||
@@ -15,6 +15,7 @@ import org.briarproject.bramble.data.DataModule;
|
||||
import org.briarproject.bramble.db.DatabaseModule;
|
||||
import org.briarproject.bramble.event.EventModule;
|
||||
import org.briarproject.bramble.identity.IdentityModule;
|
||||
import org.briarproject.bramble.io.IoModule;
|
||||
import org.briarproject.bramble.lifecycle.LifecycleModule;
|
||||
import org.briarproject.bramble.record.RecordModule;
|
||||
import org.briarproject.bramble.sync.SyncModule;
|
||||
@@ -48,6 +49,7 @@ import dagger.Component;
|
||||
DatabaseModule.class,
|
||||
EventModule.class,
|
||||
IdentityModule.class,
|
||||
IoModule.class,
|
||||
LifecycleModule.class,
|
||||
MessagingModule.class,
|
||||
RecordModule.class,
|
||||
|
||||
@@ -17,6 +17,7 @@ import org.briarproject.bramble.data.DataModule;
|
||||
import org.briarproject.bramble.db.DatabaseModule;
|
||||
import org.briarproject.bramble.event.EventModule;
|
||||
import org.briarproject.bramble.identity.IdentityModule;
|
||||
import org.briarproject.bramble.io.IoModule;
|
||||
import org.briarproject.bramble.lifecycle.LifecycleModule;
|
||||
import org.briarproject.bramble.properties.PropertiesModule;
|
||||
import org.briarproject.bramble.record.RecordModule;
|
||||
@@ -68,6 +69,7 @@ import dagger.Component;
|
||||
GroupInvitationModule.class,
|
||||
IdentityModule.class,
|
||||
IntroductionModule.class,
|
||||
IoModule.class,
|
||||
LifecycleModule.class,
|
||||
MessagingModule.class,
|
||||
PrivateGroupModule.class,
|
||||
|
||||
@@ -62,7 +62,7 @@ internal class MessagingControllerImplTest : ControllerTest() {
|
||||
emptyList()
|
||||
)
|
||||
private val sessionId = SessionId(getRandomId())
|
||||
private val privateMessage = PrivateMessage(message)
|
||||
private val privateMessage = PrivateMessage(message, emptyList())
|
||||
|
||||
@Test
|
||||
fun list() {
|
||||
|
||||
Reference in New Issue
Block a user