Remove raw messages from DB interface.

This commit is contained in:
akwizgran
2018-08-24 16:47:53 +01:00
parent 5626f3d761
commit 48933637d8
10 changed files with 49 additions and 92 deletions

View File

@@ -151,13 +151,13 @@ public interface DatabaseComponent {
throws DbException; throws DbException;
/** /**
* Returns a batch of raw messages for the given contact, with a total * Returns a batch of messages for the given contact, with a total length
* length less than or equal to the given length, for transmission over a * less than or equal to the given length, for transmission over a
* transport with the given maximum latency. Returns null if there are no * transport with the given maximum latency. Returns null if there are no
* sendable messages that fit in the given length. * sendable messages that fit in the given length.
*/ */
@Nullable @Nullable
Collection<byte[]> generateBatch(Transaction txn, ContactId c, Collection<Message> generateBatch(Transaction txn, ContactId c,
int maxLength, int maxLatency) throws DbException; int maxLength, int maxLatency) throws DbException;
/** /**
@@ -178,14 +178,14 @@ public interface DatabaseComponent {
throws DbException; throws DbException;
/** /**
* Returns a batch of raw messages for the given contact, with a total * Returns a batch of messages for the given contact, with a total length
* length less than or equal to the given length, for transmission over a * less than or equal to the given length, for transmission over a
* transport with the given maximum latency. Only messages that have been * transport with the given maximum latency. Only messages that have been
* requested by the contact are returned. Returns null if there are no * requested by the contact are returned. Returns null if there are no
* sendable messages that fit in the given length. * sendable messages that fit in the given length.
*/ */
@Nullable @Nullable
Collection<byte[]> generateRequestedBatch(Transaction txn, ContactId c, Collection<Message> generateRequestedBatch(Transaction txn, ContactId c,
int maxLength, int maxLatency) throws DbException; int maxLength, int maxLatency) throws DbException;
/** /**

View File

@@ -474,15 +474,6 @@ interface Database<T> {
*/ */
long getNextSendTime(T txn, ContactId c) throws DbException; long getNextSendTime(T txn, ContactId c) throws DbException;
/**
* Returns the message with the given ID, in serialised form.
* <p/>
* Read-only.
*
* @throws MessageDeletedException if the message has been deleted
*/
byte[] getRawMessage(T txn, MessageId m) throws DbException;
/** /**
* Returns the IDs of some messages that are eligible to be sent to the * Returns the IDs of some messages that are eligible to be sent to the
* given contact and have been requested by the contact, up to the given * given contact and have been requested by the contact, up to the given

View File

@@ -307,16 +307,16 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
@Nullable @Nullable
@Override @Override
public Collection<byte[]> generateBatch(Transaction transaction, public Collection<Message> generateBatch(Transaction transaction,
ContactId c, int maxLength, int maxLatency) throws DbException { ContactId c, int maxLength, int maxLatency) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException(); if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction); T txn = unbox(transaction);
if (!db.containsContact(txn, c)) if (!db.containsContact(txn, c))
throw new NoSuchContactException(); throw new NoSuchContactException();
Collection<MessageId> ids = db.getMessagesToSend(txn, c, maxLength); Collection<MessageId> ids = db.getMessagesToSend(txn, c, maxLength);
List<byte[]> messages = new ArrayList<>(ids.size()); List<Message> messages = new ArrayList<>(ids.size());
for (MessageId m : ids) { for (MessageId m : ids) {
messages.add(db.getRawMessage(txn, m)); messages.add(db.getMessage(txn, m));
db.updateExpiryTime(txn, c, m, maxLatency); db.updateExpiryTime(txn, c, m, maxLatency);
} }
if (ids.isEmpty()) return null; if (ids.isEmpty()) return null;
@@ -356,7 +356,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
@Nullable @Nullable
@Override @Override
public Collection<byte[]> generateRequestedBatch(Transaction transaction, public Collection<Message> generateRequestedBatch(Transaction transaction,
ContactId c, int maxLength, int maxLatency) throws DbException { ContactId c, int maxLength, int maxLatency) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException(); if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction); T txn = unbox(transaction);
@@ -364,9 +364,9 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
throw new NoSuchContactException(); throw new NoSuchContactException();
Collection<MessageId> ids = db.getRequestedMessagesToSend(txn, c, Collection<MessageId> ids = db.getRequestedMessagesToSend(txn, c,
maxLength); maxLength);
List<byte[]> messages = new ArrayList<>(ids.size()); List<Message> messages = new ArrayList<>(ids.size());
for (MessageId m : ids) { for (MessageId m : ids) {
messages.add(db.getRawMessage(txn, m)); messages.add(db.getMessage(txn, m));
db.updateExpiryTime(txn, c, m, maxLatency); db.updateExpiryTime(txn, c, m, maxLatency);
} }
if (ids.isEmpty()) return null; if (ids.isEmpty()) return null;

View File

@@ -2045,30 +2045,6 @@ abstract class JdbcDatabase implements Database<Connection> {
} }
} }
@Override
public byte[] getRawMessage(Connection txn, MessageId m)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT raw FROM messages WHERE messageId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
rs = ps.executeQuery();
if (!rs.next()) throw new DbStateException();
byte[] raw = rs.getBytes(1);
if (rs.next()) throw new DbStateException();
rs.close();
ps.close();
if (raw == null) throw new MessageDeletedException();
return raw;
} catch (SQLException e) {
tryToClose(rs);
tryToClose(ps);
throw new DbException(e);
}
}
@Override @Override
public Collection<MessageId> getRequestedMessagesToSend(Connection txn, public Collection<MessageId> getRequestedMessagesToSend(Connection txn,
ContactId c, int maxLength) throws DbException { ContactId c, int maxLength) throws DbException {

View File

@@ -13,6 +13,7 @@ import org.briarproject.bramble.api.lifecycle.IoExecutor;
import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent; import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.Offer; import org.briarproject.bramble.api.sync.Offer;
import org.briarproject.bramble.api.sync.Request; import org.briarproject.bramble.api.sync.Request;
import org.briarproject.bramble.api.sync.SyncRecordWriter; import org.briarproject.bramble.api.sync.SyncRecordWriter;
@@ -274,7 +275,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
if (!generateBatchQueued.getAndSet(false)) if (!generateBatchQueued.getAndSet(false))
throw new AssertionError(); throw new AssertionError();
try { try {
Collection<byte[]> b; Collection<Message> b;
Transaction txn = db.startTransaction(false); Transaction txn = db.startTransaction(false);
try { try {
b = db.generateRequestedBatch(txn, contactId, b = db.generateRequestedBatch(txn, contactId,
@@ -296,9 +297,9 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
private class WriteBatch implements ThrowingRunnable<IOException> { private class WriteBatch implements ThrowingRunnable<IOException> {
private final Collection<byte[]> batch; private final Collection<Message> batch;
private WriteBatch(Collection<byte[]> batch) { private WriteBatch(Collection<Message> batch) {
this.batch = batch; this.batch = batch;
} }
@@ -306,7 +307,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
@Override @Override
public void run() throws IOException { public void run() throws IOException {
if (interrupted) return; if (interrupted) return;
for (byte[] raw : batch) recordWriter.writeMessage(raw); for (Message m : batch) recordWriter.writeMessage(m.getRaw());
LOG.info("Sent batch"); LOG.info("Sent batch");
generateBatch(); generateBatch();
} }

View File

@@ -13,6 +13,7 @@ import org.briarproject.bramble.api.lifecycle.IoExecutor;
import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent; import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.SyncRecordWriter; import org.briarproject.bramble.api.sync.SyncRecordWriter;
import org.briarproject.bramble.api.sync.SyncSession; import org.briarproject.bramble.api.sync.SyncSession;
import org.briarproject.bramble.api.transport.StreamWriter; import org.briarproject.bramble.api.transport.StreamWriter;
@@ -171,7 +172,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
public void run() { public void run() {
if (interrupted) return; if (interrupted) return;
try { try {
Collection<byte[]> b; Collection<Message> b;
Transaction txn = db.startTransaction(false); Transaction txn = db.startTransaction(false);
try { try {
b = db.generateBatch(txn, contactId, b = db.generateBatch(txn, contactId,
@@ -193,9 +194,9 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
private class WriteBatch implements ThrowingRunnable<IOException> { private class WriteBatch implements ThrowingRunnable<IOException> {
private final Collection<byte[]> batch; private final Collection<Message> batch;
private WriteBatch(Collection<byte[]> batch) { private WriteBatch(Collection<Message> batch) {
this.batch = batch; this.batch = batch;
} }
@@ -203,7 +204,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
@Override @Override
public void run() throws IOException { public void run() throws IOException {
if (interrupted) return; if (interrupted) return;
for (byte[] raw : batch) recordWriter.writeMessage(raw); for (Message m : batch) recordWriter.writeMessage(m.getRaw());
LOG.info("Sent batch"); LOG.info("Sent batch");
dbExecutor.execute(new GenerateBatch()); dbExecutor.execute(new GenerateBatch());
} }

View File

@@ -99,9 +99,8 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
private final Group group; private final Group group;
private final Author author; private final Author author;
private final LocalAuthor localAuthor; private final LocalAuthor localAuthor;
private final Message message; private final Message message, message1;
private final MessageId messageId, messageId1; private final MessageId messageId, messageId1;
private final byte[] raw, raw1;
private final Metadata metadata; private final Metadata metadata;
private final TransportId transportId; private final TransportId transportId;
private final int maxLatency; private final int maxLatency;
@@ -117,11 +116,9 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
author = getAuthor(); author = getAuthor();
localAuthor = getLocalAuthor(); localAuthor = getLocalAuthor();
message = getMessage(groupId); message = getMessage(groupId);
Message message1 = getMessage(groupId); message1 = getMessage(groupId);
messageId = message.getId(); messageId = message.getId();
messageId1 = message1.getId(); messageId1 = message1.getId();
raw = message.getRaw();
raw1 = message1.getRaw();
metadata = new Metadata(); metadata = new Metadata();
metadata.put("foo", new byte[] {'b', 'a', 'r'}); metadata.put("foo", new byte[] {'b', 'a', 'r'});
transportId = getTransportId(); transportId = getTransportId();
@@ -867,7 +864,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
@Test @Test
public void testGenerateBatch() throws Exception { public void testGenerateBatch() throws Exception {
Collection<MessageId> ids = Arrays.asList(messageId, messageId1); Collection<MessageId> ids = Arrays.asList(messageId, messageId1);
Collection<byte[]> messages = Arrays.asList(raw, raw1); Collection<Message> messages = Arrays.asList(message, message1);
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(database).startTransaction(); oneOf(database).startTransaction();
will(returnValue(txn)); will(returnValue(txn));
@@ -876,12 +873,12 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
oneOf(database).getMessagesToSend(txn, contactId, oneOf(database).getMessagesToSend(txn, contactId,
MAX_MESSAGE_LENGTH * 2); MAX_MESSAGE_LENGTH * 2);
will(returnValue(ids)); will(returnValue(ids));
oneOf(database).getRawMessage(txn, messageId); oneOf(database).getMessage(txn, messageId);
will(returnValue(raw)); will(returnValue(message));
oneOf(database).updateExpiryTime(txn, contactId, messageId, oneOf(database).updateExpiryTime(txn, contactId, messageId,
maxLatency); maxLatency);
oneOf(database).getRawMessage(txn, messageId1); oneOf(database).getMessage(txn, messageId1);
will(returnValue(raw1)); will(returnValue(message1));
oneOf(database).updateExpiryTime(txn, contactId, messageId1, oneOf(database).updateExpiryTime(txn, contactId, messageId1,
maxLatency); maxLatency);
oneOf(database).lowerRequestedFlag(txn, contactId, ids); oneOf(database).lowerRequestedFlag(txn, contactId, ids);
@@ -963,7 +960,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
@Test @Test
public void testGenerateRequestedBatch() throws Exception { public void testGenerateRequestedBatch() throws Exception {
Collection<MessageId> ids = Arrays.asList(messageId, messageId1); Collection<MessageId> ids = Arrays.asList(messageId, messageId1);
Collection<byte[]> messages = Arrays.asList(raw, raw1); Collection<Message> messages = Arrays.asList(message, message1);
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(database).startTransaction(); oneOf(database).startTransaction();
will(returnValue(txn)); will(returnValue(txn));
@@ -972,12 +969,12 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
oneOf(database).getRequestedMessagesToSend(txn, contactId, oneOf(database).getRequestedMessagesToSend(txn, contactId,
MAX_MESSAGE_LENGTH * 2); MAX_MESSAGE_LENGTH * 2);
will(returnValue(ids)); will(returnValue(ids));
oneOf(database).getRawMessage(txn, messageId); oneOf(database).getMessage(txn, messageId);
will(returnValue(raw)); will(returnValue(message));
oneOf(database).updateExpiryTime(txn, contactId, messageId, oneOf(database).updateExpiryTime(txn, contactId, messageId,
maxLatency); maxLatency);
oneOf(database).getRawMessage(txn, messageId1); oneOf(database).getMessage(txn, messageId1);
will(returnValue(raw1)); will(returnValue(message1));
oneOf(database).updateExpiryTime(txn, contactId, messageId1, oneOf(database).updateExpiryTime(txn, contactId, messageId1,
maxLatency); maxLatency);
oneOf(database).lowerRequestedFlag(txn, contactId, ids); oneOf(database).lowerRequestedFlag(txn, contactId, ids);

View File

@@ -506,11 +506,11 @@ public abstract class DatabasePerformanceTest extends BrambleTestCase {
} }
@Test @Test
public void testGetRawMessage() throws Exception { public void testGetMessage() throws Exception {
String name = "getRawMessage(T, MessageId)"; String name = "getMessage(T, MessageId)";
benchmark(name, db -> { benchmark(name, db -> {
Connection txn = db.startTransaction(); Connection txn = db.startTransaction();
db.getRawMessage(txn, pickRandom(messages).getId()); db.getMessage(txn, pickRandom(messages).getId());
db.commitTransaction(txn); db.commitTransaction(txn);
}); });
} }

View File

@@ -144,7 +144,8 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
assertTrue(db.containsContact(txn, contactId)); assertTrue(db.containsContact(txn, contactId));
assertTrue(db.containsGroup(txn, groupId)); assertTrue(db.containsGroup(txn, groupId));
assertTrue(db.containsMessage(txn, messageId)); assertTrue(db.containsMessage(txn, messageId));
assertArrayEquals(message.getRaw(), db.getRawMessage(txn, messageId)); assertArrayEquals(message.getRaw(),
db.getMessage(txn, messageId).getRaw());
// Delete the records // Delete the records
db.removeMessage(txn, messageId); db.removeMessage(txn, messageId);
@@ -1645,9 +1646,6 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
assertEquals(message.getTimestamp(), m.getTimestamp()); assertEquals(message.getTimestamp(), m.getTimestamp());
assertArrayEquals(message.getRaw(), m.getRaw()); assertArrayEquals(message.getRaw(), m.getRaw());
// The raw message should be available
assertArrayEquals(message.getRaw(), db.getRawMessage(txn, messageId));
// Delete the message // Delete the message
db.deleteMessage(txn, messageId); db.deleteMessage(txn, messageId);
@@ -1668,14 +1666,6 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
// Expected // Expected
} }
// Requesting the raw message should throw an exception
try {
db.getRawMessage(txn, messageId);
fail();
} catch (MessageDeletedException expected) {
// Expected
}
db.commitTransaction(txn); db.commitTransaction(txn);
db.close(); db.close();
} }
@@ -1806,7 +1796,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
Connection txn = db.startTransaction(); Connection txn = db.startTransaction();
try { try {
// Ask for a nonexistent message - an exception should be thrown // Ask for a nonexistent message - an exception should be thrown
db.getRawMessage(txn, messageId); db.getMessage(txn, messageId);
fail(); fail();
} catch (DbException expected) { } catch (DbException expected) {
// It should be possible to abort the transaction without error // It should be possible to abort the transaction without error

View File

@@ -5,6 +5,8 @@ import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.Transaction; import org.briarproject.bramble.api.db.Transaction;
import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.GroupId;
import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.MessageId; import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.api.sync.SyncRecordWriter; import org.briarproject.bramble.api.sync.SyncRecordWriter;
import org.briarproject.bramble.api.transport.StreamWriter; import org.briarproject.bramble.api.transport.StreamWriter;
@@ -13,12 +15,11 @@ import org.briarproject.bramble.test.ImmediateExecutor;
import org.jmock.Expectations; import org.jmock.Expectations;
import org.junit.Test; import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import static java.util.Collections.singletonList;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS; import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS;
import static org.briarproject.bramble.test.TestUtils.getRandomBytes; import static org.briarproject.bramble.test.TestUtils.getMessage;
import static org.briarproject.bramble.test.TestUtils.getRandomId; import static org.briarproject.bramble.test.TestUtils.getRandomId;
public class SimplexOutgoingSessionTest extends BrambleMockTestCase { public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
@@ -33,7 +34,8 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
private final Executor dbExecutor = new ImmediateExecutor(); private final Executor dbExecutor = new ImmediateExecutor();
private final ContactId contactId = new ContactId(234); private final ContactId contactId = new ContactId(234);
private final MessageId messageId = new MessageId(getRandomId()); private final Message message = getMessage(new GroupId(getRandomId()));
private final MessageId messageId = message.getId();
@Test @Test
public void testNothingToSend() throws Exception { public void testNothingToSend() throws Exception {
@@ -72,8 +74,7 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
@Test @Test
public void testSomethingToSend() throws Exception { public void testSomethingToSend() throws Exception {
Ack ack = new Ack(Collections.singletonList(messageId)); Ack ack = new Ack(singletonList(messageId));
byte[] raw = getRandomBytes(1234);
SimplexOutgoingSession session = new SimplexOutgoingSession(db, SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, MAX_LATENCY, streamWriter, dbExecutor, eventBus, contactId, MAX_LATENCY, streamWriter,
recordWriter); recordWriter);
@@ -98,10 +99,10 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
will(returnValue(msgTxn)); will(returnValue(msgTxn));
oneOf(db).generateBatch(with(msgTxn), with(contactId), oneOf(db).generateBatch(with(msgTxn), with(contactId),
with(any(int.class)), with(MAX_LATENCY)); with(any(int.class)), with(MAX_LATENCY));
will(returnValue(Arrays.asList(raw))); will(returnValue(singletonList(message)));
oneOf(db).commitTransaction(msgTxn); oneOf(db).commitTransaction(msgTxn);
oneOf(db).endTransaction(msgTxn); oneOf(db).endTransaction(msgTxn);
oneOf(recordWriter).writeMessage(raw); oneOf(recordWriter).writeMessage(message.getRaw());
// No more acks // No more acks
oneOf(db).startTransaction(false); oneOf(db).startTransaction(false);
will(returnValue(noAckTxn)); will(returnValue(noAckTxn));