Compare commits

...

4 Commits

Author SHA1 Message Date
akwizgran
5a25d77d15 Add migration that drops offers table. 2020-11-03 17:31:22 +00:00
akwizgran
79142b2e97 Return early before allocating list. 2020-11-03 17:31:22 +00:00
akwizgran
0b2c84c53b Rename events. 2020-11-03 17:31:21 +00:00
akwizgran
8cbd27c011 Broadcast message IDs to request instead of storing them. 2020-11-03 17:31:19 +00:00
14 changed files with 149 additions and 389 deletions

View File

@@ -0,0 +1,25 @@
package org.briarproject.bramble.api;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
/**
* An item that can be consumed once.
*/
@NotNullByDefault
public class Consumable<T> {
private final AtomicReference<T> reference;
public Consumable(T item) {
reference = new AtomicReference<>(item);
}
@Nullable
public T consume() {
return reference.getAndSet(null);
}
}

View File

@@ -181,14 +181,6 @@ public interface DatabaseComponent extends TransactionManager {
Offer generateOffer(Transaction txn, ContactId c, int maxMessages, Offer generateOffer(Transaction txn, ContactId c, int maxMessages,
int maxLatency, boolean small) throws DbException; int maxLatency, boolean small) throws DbException;
/**
* Returns a request for the given contact, or null if there are no
* messages to request.
*/
@Nullable
Request generateRequest(Transaction txn, ContactId c, int maxMessages)
throws DbException;
/** /**
* Returns a batch of messages for the given contact, with a total length * Returns a batch of messages for the given contact, with a total length
* less than or equal to the given length, for transmission over a * less than or equal to the given length, for transmission over a

View File

@@ -1,26 +0,0 @@
package org.briarproject.bramble.api.sync.event;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.event.Event;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import javax.annotation.concurrent.Immutable;
/**
* An event that is broadcast when a message is offered by a contact and needs
* to be requested.
*/
@Immutable
@NotNullByDefault
public class MessageToRequestEvent extends Event {
private final ContactId contactId;
public MessageToRequestEvent(ContactId contactId) {
this.contactId = contactId;
}
public ContactId getContactId() {
return contactId;
}
}

View File

@@ -7,16 +7,16 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import javax.annotation.concurrent.Immutable; import javax.annotation.concurrent.Immutable;
/** /**
* An event that is broadcast when a message is received from, or offered by, a * An event that is broadcast when one or more messages are received from, or
* contact and needs to be acknowledged. * offered by, a contact and need to be acknowledged.
*/ */
@Immutable @Immutable
@NotNullByDefault @NotNullByDefault
public class MessageToAckEvent extends Event { public class MessagesToAckEvent extends Event {
private final ContactId contactId; private final ContactId contactId;
public MessageToAckEvent(ContactId contactId) { public MessagesToAckEvent(ContactId contactId) {
this.contactId = contactId; this.contactId = contactId;
} }

View File

@@ -0,0 +1,39 @@
package org.briarproject.bramble.api.sync.event;
import org.briarproject.bramble.api.Consumable;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.event.Event;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.MessageId;
import java.util.Collection;
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
/**
* An event that is broadcast when one or more messages are offered by a
* contact and need to be requested.
*/
@Immutable
@NotNullByDefault
public class MessagesToRequestEvent extends Event {
private final ContactId contactId;
private final Consumable<Collection<MessageId>> ids;
public MessagesToRequestEvent(ContactId contactId,
Collection<MessageId> ids) {
this.contactId = contactId;
this.ids = new Consumable<>(ids);
}
public ContactId getContactId() {
return contactId;
}
@Nullable
public Collection<MessageId> consumeIds() {
return ids.consume();
}
}

View File

@@ -126,11 +126,6 @@ interface Database<T> {
void addMessageDependency(T txn, Message dependent, MessageId dependency, void addMessageDependency(T txn, Message dependent, MessageId dependency,
MessageState dependentState) throws DbException; MessageState dependentState) throws DbException;
/**
* Records that a message has been offered by the given contact.
*/
void addOfferedMessage(T txn, ContactId c, MessageId m) throws DbException;
/** /**
* Stores a pending contact. * Stores a pending contact.
*/ */
@@ -219,13 +214,6 @@ interface Database<T> {
boolean containsVisibleMessage(T txn, ContactId c, MessageId m) boolean containsVisibleMessage(T txn, ContactId c, MessageId m)
throws DbException; throws DbException;
/**
* Returns the number of messages offered by the given contact.
* <p/>
* Read-only.
*/
int countOfferedMessages(T txn, ContactId c) throws DbException;
/** /**
* Deletes the message with the given ID. Unlike * Deletes the message with the given ID. Unlike
* {@link #removeMessage(Object, MessageId)}, the message ID and any other * {@link #removeMessage(Object, MessageId)}, the message ID and any other
@@ -467,15 +455,6 @@ interface Database<T> {
Collection<MessageId> getSmallMessagesToOffer(T txn, ContactId c, Collection<MessageId> getSmallMessagesToOffer(T txn, ContactId c,
int maxMessages, int maxLatency) throws DbException; int maxMessages, int maxLatency) throws DbException;
/**
* Returns the IDs of some messages that are eligible to be requested from
* the given contact, up to the given number of messages.
* <p/>
* Read-only.
*/
Collection<MessageId> getMessagesToRequest(T txn, ContactId c,
int maxMessages) throws DbException;
/** /**
* Returns the IDs of some messages that are eligible to be sent to the * Returns the IDs of some messages that are eligible to be sent to the
* given contact, up to the given total length. * given contact, up to the given total length.
@@ -656,13 +635,6 @@ interface Database<T> {
*/ */
void removeMessage(T txn, MessageId m) throws DbException; void removeMessage(T txn, MessageId m) throws DbException;
/**
* Removes the given offered messages that were offered by the given
* contact.
*/
void removeOfferedMessages(T txn, ContactId c,
Collection<MessageId> requested) throws DbException;
/** /**
* Removes a pending contact (and all associated state) from the database. * Removes a pending contact (and all associated state) from the database.
*/ */

View File

@@ -61,10 +61,10 @@ import org.briarproject.bramble.api.sync.event.MessageAddedEvent;
import org.briarproject.bramble.api.sync.event.MessageRequestedEvent; import org.briarproject.bramble.api.sync.event.MessageRequestedEvent;
import org.briarproject.bramble.api.sync.event.MessageSharedEvent; import org.briarproject.bramble.api.sync.event.MessageSharedEvent;
import org.briarproject.bramble.api.sync.event.MessageStateChangedEvent; import org.briarproject.bramble.api.sync.event.MessageStateChangedEvent;
import org.briarproject.bramble.api.sync.event.MessageToAckEvent;
import org.briarproject.bramble.api.sync.event.MessageToRequestEvent;
import org.briarproject.bramble.api.sync.event.MessagesAckedEvent; import org.briarproject.bramble.api.sync.event.MessagesAckedEvent;
import org.briarproject.bramble.api.sync.event.MessagesSentEvent; import org.briarproject.bramble.api.sync.event.MessagesSentEvent;
import org.briarproject.bramble.api.sync.event.MessagesToAckEvent;
import org.briarproject.bramble.api.sync.event.MessagesToRequestEvent;
import org.briarproject.bramble.api.sync.event.SyncVersionsUpdatedEvent; import org.briarproject.bramble.api.sync.event.SyncVersionsUpdatedEvent;
import org.briarproject.bramble.api.sync.validation.MessageState; import org.briarproject.bramble.api.sync.validation.MessageState;
import org.briarproject.bramble.api.transport.KeySetId; import org.briarproject.bramble.api.transport.KeySetId;
@@ -91,7 +91,6 @@ import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE;
import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED; import static org.briarproject.bramble.api.sync.Group.Visibility.SHARED;
import static org.briarproject.bramble.api.sync.validation.MessageState.DELIVERED; import static org.briarproject.bramble.api.sync.validation.MessageState.DELIVERED;
import static org.briarproject.bramble.api.sync.validation.MessageState.UNKNOWN; import static org.briarproject.bramble.api.sync.validation.MessageState.UNKNOWN;
import static org.briarproject.bramble.db.DatabaseConstants.MAX_OFFERED_MESSAGES;
import static org.briarproject.bramble.util.LogUtils.logDuration; import static org.briarproject.bramble.util.LogUtils.logDuration;
import static org.briarproject.bramble.util.LogUtils.logException; import static org.briarproject.bramble.util.LogUtils.logException;
import static org.briarproject.bramble.util.LogUtils.now; import static org.briarproject.bramble.util.LogUtils.now;
@@ -416,12 +415,12 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
if (small) if (small)
ids = db.getSmallMessagesToSend(txn, c, maxLength, maxLatency); ids = db.getSmallMessagesToSend(txn, c, maxLength, maxLatency);
else ids = db.getMessagesToSend(txn, c, maxLength, maxLatency); else ids = db.getMessagesToSend(txn, c, maxLength, maxLatency);
if (ids.isEmpty()) return null;
List<Message> messages = new ArrayList<>(ids.size()); List<Message> messages = new ArrayList<>(ids.size());
for (MessageId m : ids) { for (MessageId m : ids) {
messages.add(db.getSmallMessage(txn, m)); messages.add(db.getSmallMessage(txn, m));
db.updateExpiryTimeAndEta(txn, c, m, maxLatency); db.updateExpiryTimeAndEta(txn, c, m, maxLatency);
} }
if (ids.isEmpty()) return null;
db.lowerRequestedFlag(txn, c, ids); db.lowerRequestedFlag(txn, c, ids);
transaction.attach(new MessagesSentEvent(c, ids)); transaction.attach(new MessagesSentEvent(c, ids));
return messages; return messages;
@@ -445,21 +444,6 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
return new Offer(ids); return new Offer(ids);
} }
@Nullable
@Override
public Request generateRequest(Transaction transaction, ContactId c,
int maxMessages) throws DbException {
if (transaction.isReadOnly()) throw new IllegalArgumentException();
T txn = unbox(transaction);
if (!db.containsContact(txn, c))
throw new NoSuchContactException();
Collection<MessageId> ids =
db.getMessagesToRequest(txn, c, maxMessages);
if (ids.isEmpty()) return null;
db.removeOfferedMessages(txn, c, ids);
return new Request(ids);
}
@Nullable @Nullable
@Override @Override
public Collection<Message> generateRequestedBatch(Transaction transaction, public Collection<Message> generateRequestedBatch(Transaction transaction,
@@ -470,12 +454,12 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
throw new NoSuchContactException(); throw new NoSuchContactException();
Collection<MessageId> ids = Collection<MessageId> ids =
db.getRequestedMessagesToSend(txn, c, maxLength, maxLatency); db.getRequestedMessagesToSend(txn, c, maxLength, maxLatency);
if (ids.isEmpty()) return null;
List<Message> messages = new ArrayList<>(ids.size()); List<Message> messages = new ArrayList<>(ids.size());
for (MessageId m : ids) { for (MessageId m : ids) {
messages.add(db.getSmallMessage(txn, m)); messages.add(db.getSmallMessage(txn, m));
db.updateExpiryTimeAndEta(txn, c, m, maxLatency); db.updateExpiryTimeAndEta(txn, c, m, maxLatency);
} }
if (ids.isEmpty()) return null;
db.lowerRequestedFlag(txn, c, ids); db.lowerRequestedFlag(txn, c, ids);
transaction.attach(new MessagesSentEvent(c, ids)); transaction.attach(new MessagesSentEvent(c, ids));
return messages; return messages;
@@ -824,7 +808,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
db.addMessage(txn, m, UNKNOWN, false, false, c); db.addMessage(txn, m, UNKNOWN, false, false, c);
transaction.attach(new MessageAddedEvent(m, c)); transaction.attach(new MessageAddedEvent(m, c));
} }
transaction.attach(new MessageToAckEvent(c)); transaction.attach(new MessagesToAckEvent(c));
} }
} }
@@ -835,21 +819,20 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
T txn = unbox(transaction); T txn = unbox(transaction);
if (!db.containsContact(txn, c)) if (!db.containsContact(txn, c))
throw new NoSuchContactException(); throw new NoSuchContactException();
boolean ack = false, request = false; boolean ack = false;
int count = db.countOfferedMessages(txn, c); List<MessageId> request = new ArrayList<>(o.getMessageIds().size());
for (MessageId m : o.getMessageIds()) { for (MessageId m : o.getMessageIds()) {
if (db.containsVisibleMessage(txn, c, m)) { if (db.containsVisibleMessage(txn, c, m)) {
db.raiseSeenFlag(txn, c, m); db.raiseSeenFlag(txn, c, m);
db.raiseAckFlag(txn, c, m); db.raiseAckFlag(txn, c, m);
ack = true; ack = true;
} else if (count < MAX_OFFERED_MESSAGES) { } else {
db.addOfferedMessage(txn, c, m); request.add(m);
request = true;
count++;
} }
} }
if (ack) transaction.attach(new MessageToAckEvent(c)); if (ack) transaction.attach(new MessagesToAckEvent(c));
if (request) transaction.attach(new MessageToRequestEvent(c)); if (!request.isEmpty())
transaction.attach(new MessagesToRequestEvent(c, request));
} }
@Override @Override

View File

@@ -6,13 +6,6 @@ import static java.util.concurrent.TimeUnit.DAYS;
interface DatabaseConstants { interface DatabaseConstants {
/**
* The maximum number of offered messages from each contact that will be
* stored. If offers arrive more quickly than requests can be sent and this
* limit is reached, additional offers will not be stored.
*/
int MAX_OFFERED_MESSAGES = 1000;
/** /**
* The namespace of the {@link Settings} where the database schema version * The namespace of the {@link Settings} where the database schema version
* is stored. * is stored.

View File

@@ -99,7 +99,7 @@ import static org.briarproject.bramble.util.LogUtils.now;
abstract class JdbcDatabase implements Database<Connection> { abstract class JdbcDatabase implements Database<Connection> {
// Package access for testing // Package access for testing
static final int CODE_SCHEMA_VERSION = 48; static final int CODE_SCHEMA_VERSION = 49;
// Time period offsets for incoming transport keys // Time period offsets for incoming transport keys
private static final int OFFSET_PREV = -1; private static final int OFFSET_PREV = -1;
@@ -220,15 +220,6 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " REFERENCES messages (messageId)" + " REFERENCES messages (messageId)"
+ " ON DELETE CASCADE)"; + " ON DELETE CASCADE)";
private static final String CREATE_OFFERS =
"CREATE TABLE offers"
+ " (messageId _HASH NOT NULL," // Not a foreign key
+ " contactId INT NOT NULL,"
+ " PRIMARY KEY (messageId, contactId),"
+ " FOREIGN KEY (contactId)"
+ " REFERENCES contacts (contactId)"
+ " ON DELETE CASCADE)";
private static final String CREATE_BLOCKS = private static final String CREATE_BLOCKS =
"CREATE TABLE blocks" "CREATE TABLE blocks"
+ " (messageId _HASH NOT NULL," + " (messageId _HASH NOT NULL,"
@@ -476,7 +467,8 @@ abstract class JdbcDatabase implements Database<Connection> {
new Migration44_45(), new Migration44_45(),
new Migration45_46(), new Migration45_46(),
new Migration46_47(dbTypes), new Migration46_47(dbTypes),
new Migration47_48(dbTypes) new Migration47_48(dbTypes),
new Migration48_49()
); );
} }
@@ -521,7 +513,6 @@ abstract class JdbcDatabase implements Database<Connection> {
s.executeUpdate(dbTypes.replaceTypes(CREATE_MESSAGES)); s.executeUpdate(dbTypes.replaceTypes(CREATE_MESSAGES));
s.executeUpdate(dbTypes.replaceTypes(CREATE_MESSAGE_METADATA)); s.executeUpdate(dbTypes.replaceTypes(CREATE_MESSAGE_METADATA));
s.executeUpdate(dbTypes.replaceTypes(CREATE_MESSAGE_DEPENDENCIES)); s.executeUpdate(dbTypes.replaceTypes(CREATE_MESSAGE_DEPENDENCIES));
s.executeUpdate(dbTypes.replaceTypes(CREATE_OFFERS));
s.executeUpdate(dbTypes.replaceTypes(CREATE_BLOCKS)); s.executeUpdate(dbTypes.replaceTypes(CREATE_BLOCKS));
s.executeUpdate(dbTypes.replaceTypes(CREATE_STATUSES)); s.executeUpdate(dbTypes.replaceTypes(CREATE_STATUSES));
s.executeUpdate(dbTypes.replaceTypes(CREATE_TRANSPORTS)); s.executeUpdate(dbTypes.replaceTypes(CREATE_TRANSPORTS));
@@ -753,9 +744,8 @@ abstract class JdbcDatabase implements Database<Connection> {
boolean messageShared = rs.getBoolean(4); boolean messageShared = rs.getBoolean(4);
int length = rs.getInt(5); int length = rs.getInt(5);
boolean deleted = rs.getBoolean(6); boolean deleted = rs.getBoolean(6);
boolean seen = removeOfferedMessage(txn, c, id);
addStatus(txn, id, c, g, timestamp, length, state, groupShared, addStatus(txn, id, c, g, timestamp, length, state, groupShared,
messageShared, deleted, seen); messageShared, deleted, false);
} }
rs.close(); rs.close();
ps.close(); ps.close();
@@ -830,8 +820,7 @@ abstract class JdbcDatabase implements Database<Connection> {
getGroupVisibility(txn, m.getGroupId()); getGroupVisibility(txn, m.getGroupId());
for (Entry<ContactId, Boolean> e : visibility.entrySet()) { for (Entry<ContactId, Boolean> e : visibility.entrySet()) {
ContactId c = e.getKey(); ContactId c = e.getKey();
boolean offered = removeOfferedMessage(txn, c, m.getId()); boolean seen = c.equals(sender);
boolean seen = offered || c.equals(sender);
addStatus(txn, m.getId(), c, m.getGroupId(), m.getTimestamp(), addStatus(txn, m.getId(), c, m.getGroupId(), m.getTimestamp(),
m.getRawLength(), state, e.getValue(), shared, false, m.getRawLength(), state, e.getValue(), shared, false,
seen); seen);
@@ -853,37 +842,6 @@ abstract class JdbcDatabase implements Database<Connection> {
} }
} }
@Override
public void addOfferedMessage(Connection txn, ContactId c, MessageId m)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT NULL FROM offers"
+ " WHERE messageId = ? AND contactId = ?";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
ps.setInt(2, c.getInt());
rs = ps.executeQuery();
boolean found = rs.next();
if (rs.next()) throw new DbStateException();
rs.close();
ps.close();
if (found) return;
sql = "INSERT INTO offers (messageId, contactId) VALUES (?, ?)";
ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes());
ps.setInt(2, c.getInt());
int affected = ps.executeUpdate();
if (affected != 1) throw new DbStateException();
ps.close();
} catch (SQLException e) {
tryToClose(rs, LOG, WARNING);
tryToClose(ps, LOG, WARNING);
throw new DbException(e);
}
}
private void addStatus(Connection txn, MessageId m, ContactId c, GroupId g, private void addStatus(Connection txn, MessageId m, ContactId c, GroupId g,
long timestamp, int length, MessageState state, boolean groupShared, long timestamp, int length, MessageState state, boolean groupShared,
boolean messageShared, boolean deleted, boolean seen) boolean messageShared, boolean deleted, boolean seen)
@@ -1285,30 +1243,6 @@ abstract class JdbcDatabase implements Database<Connection> {
} }
} }
@Override
public int countOfferedMessages(Connection txn, ContactId c)
throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT COUNT (messageId) FROM offers "
+ " WHERE contactId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
rs = ps.executeQuery();
if (!rs.next()) throw new DbException();
int count = rs.getInt(1);
if (rs.next()) throw new DbException();
rs.close();
ps.close();
return count;
} catch (SQLException e) {
tryToClose(rs, LOG, WARNING);
tryToClose(ps, LOG, WARNING);
throw new DbException(e);
}
}
@Override @Override
public void deleteMessage(Connection txn, MessageId m) throws DbException { public void deleteMessage(Connection txn, MessageId m) throws DbException {
PreparedStatement ps = null; PreparedStatement ps = null;
@@ -2177,31 +2111,6 @@ abstract class JdbcDatabase implements Database<Connection> {
} }
} }
@Override
public Collection<MessageId> getMessagesToRequest(Connection txn,
ContactId c, int maxMessages) throws DbException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = "SELECT messageId FROM offers"
+ " WHERE contactId = ?"
+ " LIMIT ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setInt(2, maxMessages);
rs = ps.executeQuery();
List<MessageId> ids = new ArrayList<>();
while (rs.next()) ids.add(new MessageId(rs.getBytes(1)));
rs.close();
ps.close();
return ids;
} catch (SQLException e) {
tryToClose(rs, LOG, WARNING);
tryToClose(ps, LOG, WARNING);
throw new DbException(e);
}
}
@Override @Override
public Collection<MessageId> getMessagesToSend(Connection txn, ContactId c, public Collection<MessageId> getMessagesToSend(Connection txn, ContactId c,
int maxLength, int maxLatency) throws DbException { int maxLength, int maxLatency) throws DbException {
@@ -3005,50 +2914,6 @@ abstract class JdbcDatabase implements Database<Connection> {
} }
} }
private boolean removeOfferedMessage(Connection txn, ContactId c,
MessageId m) throws DbException {
PreparedStatement ps = null;
try {
String sql = "DELETE FROM offers"
+ " WHERE contactId = ? AND messageId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
ps.setBytes(2, m.getBytes());
int affected = ps.executeUpdate();
if (affected < 0 || affected > 1) throw new DbStateException();
ps.close();
return affected == 1;
} catch (SQLException e) {
tryToClose(ps, LOG, WARNING);
throw new DbException(e);
}
}
@Override
public void removeOfferedMessages(Connection txn, ContactId c,
Collection<MessageId> requested) throws DbException {
PreparedStatement ps = null;
try {
String sql = "DELETE FROM offers"
+ " WHERE contactId = ? AND messageId = ?";
ps = txn.prepareStatement(sql);
ps.setInt(1, c.getInt());
for (MessageId m : requested) {
ps.setBytes(2, m.getBytes());
ps.addBatch();
}
int[] batchAffected = ps.executeBatch();
if (batchAffected.length != requested.size())
throw new DbStateException();
for (int rows : batchAffected)
if (rows != 1) throw new DbStateException();
ps.close();
} catch (SQLException e) {
tryToClose(ps, LOG, WARNING);
throw new DbException(e);
}
}
@Override @Override
public void removePendingContact(Connection txn, PendingContactId p) public void removePendingContact(Connection txn, PendingContactId p)
throws DbException { throws DbException {

View File

@@ -0,0 +1,40 @@
package org.briarproject.bramble.db;
import org.briarproject.bramble.api.db.DbException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.logging.Logger;
import static java.util.logging.Level.WARNING;
import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.db.JdbcUtils.tryToClose;
class Migration48_49 implements Migration<Connection> {
private static final Logger LOG = getLogger(Migration48_49.class.getName());
@Override
public int getStartVersion() {
return 48;
}
@Override
public int getEndVersion() {
return 49;
}
@Override
public void migrate(Connection txn) throws DbException {
Statement s = null;
try {
s = txn.createStatement();
s.execute("DROP TABLE offers");
s.close();
} catch (SQLException e) {
tryToClose(s, LOG, WARNING);
throw new DbException(e);
}
}
}

View File

@@ -15,6 +15,7 @@ import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.plugin.event.TransportInactiveEvent; import org.briarproject.bramble.api.plugin.event.TransportInactiveEvent;
import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.Message; import org.briarproject.bramble.api.sync.Message;
import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.api.sync.Offer; import org.briarproject.bramble.api.sync.Offer;
import org.briarproject.bramble.api.sync.Priority; import org.briarproject.bramble.api.sync.Priority;
import org.briarproject.bramble.api.sync.Request; import org.briarproject.bramble.api.sync.Request;
@@ -25,8 +26,8 @@ import org.briarproject.bramble.api.sync.event.CloseSyncConnectionsEvent;
import org.briarproject.bramble.api.sync.event.GroupVisibilityUpdatedEvent; import org.briarproject.bramble.api.sync.event.GroupVisibilityUpdatedEvent;
import org.briarproject.bramble.api.sync.event.MessageRequestedEvent; import org.briarproject.bramble.api.sync.event.MessageRequestedEvent;
import org.briarproject.bramble.api.sync.event.MessageSharedEvent; import org.briarproject.bramble.api.sync.event.MessageSharedEvent;
import org.briarproject.bramble.api.sync.event.MessageToAckEvent; import org.briarproject.bramble.api.sync.event.MessagesToAckEvent;
import org.briarproject.bramble.api.sync.event.MessageToRequestEvent; import org.briarproject.bramble.api.sync.event.MessagesToRequestEvent;
import org.briarproject.bramble.api.system.Clock; import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.api.transport.StreamWriter; import org.briarproject.bramble.api.transport.StreamWriter;
@@ -87,8 +88,6 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
private final AtomicBoolean generateAckQueued = new AtomicBoolean(false); private final AtomicBoolean generateAckQueued = new AtomicBoolean(false);
private final AtomicBoolean generateBatchQueued = new AtomicBoolean(false); private final AtomicBoolean generateBatchQueued = new AtomicBoolean(false);
private final AtomicBoolean generateOfferQueued = new AtomicBoolean(false); private final AtomicBoolean generateOfferQueued = new AtomicBoolean(false);
private final AtomicBoolean generateRequestQueued =
new AtomicBoolean(false);
private final AtomicLong nextSendTime = new AtomicLong(Long.MAX_VALUE); private final AtomicLong nextSendTime = new AtomicLong(Long.MAX_VALUE);
private volatile boolean interrupted = false; private volatile boolean interrupted = false;
@@ -125,7 +124,6 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
generateAck(); generateAck();
generateBatch(); generateBatch();
generateOffer(); generateOffer();
generateRequest();
long now = clock.currentTimeMillis(); long now = clock.currentTimeMillis();
long nextKeepalive = now + maxIdleTime; long nextKeepalive = now + maxIdleTime;
boolean dataToFlush = true; boolean dataToFlush = true;
@@ -197,11 +195,6 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
dbExecutor.execute(new GenerateOffer()); dbExecutor.execute(new GenerateOffer());
} }
private void generateRequest() {
if (generateRequestQueued.compareAndSet(false, true))
dbExecutor.execute(new GenerateRequest());
}
private void setNextSendTime(long time) { private void setNextSendTime(long time) {
long old = nextSendTime.getAndSet(time); long old = nextSendTime.getAndSet(time);
if (time < old) writerTasks.add(NEXT_SEND_TIME_DECREASED); if (time < old) writerTasks.add(NEXT_SEND_TIME_DECREASED);
@@ -227,12 +220,16 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
} else if (e instanceof MessageRequestedEvent) { } else if (e instanceof MessageRequestedEvent) {
if (((MessageRequestedEvent) e).getContactId().equals(contactId)) if (((MessageRequestedEvent) e).getContactId().equals(contactId))
generateBatch(); generateBatch();
} else if (e instanceof MessageToAckEvent) { } else if (e instanceof MessagesToAckEvent) {
if (((MessageToAckEvent) e).getContactId().equals(contactId)) if (((MessagesToAckEvent) e).getContactId().equals(contactId))
generateAck(); generateAck();
} else if (e instanceof MessageToRequestEvent) { } else if (e instanceof MessagesToRequestEvent) {
if (((MessageToRequestEvent) e).getContactId().equals(contactId)) MessagesToRequestEvent m = (MessagesToRequestEvent) e;
generateRequest(); if (m.getContactId().equals(contactId)) {
Collection<MessageId> ids = m.consumeIds();
if (ids != null)
writerTasks.add(new WriteRequest(new Request(ids)));
}
} else if (e instanceof LifecycleEvent) { } else if (e instanceof LifecycleEvent) {
LifecycleEvent l = (LifecycleEvent) e; LifecycleEvent l = (LifecycleEvent) e;
if (l.getLifecycleState() == STOPPING) interrupt(); if (l.getLifecycleState() == STOPPING) interrupt();
@@ -372,27 +369,6 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
} }
} }
private class GenerateRequest implements Runnable {
@DatabaseExecutor
@Override
public void run() {
if (interrupted) return;
if (!generateRequestQueued.getAndSet(false))
throw new AssertionError();
try {
Request r = db.transactionWithNullableResult(false, txn ->
db.generateRequest(txn, contactId, MAX_MESSAGE_IDS));
if (LOG.isLoggable(INFO))
LOG.info("Generated request: " + (r != null));
if (r != null) writerTasks.add(new WriteRequest(r));
} catch (DbException e) {
logException(LOG, WARNING, e);
interrupt();
}
}
}
private class WriteRequest implements ThrowingRunnable<IOException> { private class WriteRequest implements ThrowingRunnable<IOException> {
private final Request request; private final Request request;
@@ -407,7 +383,6 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
if (interrupted) return; if (interrupted) return;
recordWriter.writeRequest(request); recordWriter.writeRequest(request);
LOG.info("Sent request"); LOG.info("Sent request");
generateRequest();
} }
} }
} }

View File

@@ -44,10 +44,10 @@ import org.briarproject.bramble.api.sync.event.MessageAddedEvent;
import org.briarproject.bramble.api.sync.event.MessageRequestedEvent; import org.briarproject.bramble.api.sync.event.MessageRequestedEvent;
import org.briarproject.bramble.api.sync.event.MessageSharedEvent; import org.briarproject.bramble.api.sync.event.MessageSharedEvent;
import org.briarproject.bramble.api.sync.event.MessageStateChangedEvent; import org.briarproject.bramble.api.sync.event.MessageStateChangedEvent;
import org.briarproject.bramble.api.sync.event.MessageToAckEvent;
import org.briarproject.bramble.api.sync.event.MessageToRequestEvent;
import org.briarproject.bramble.api.sync.event.MessagesAckedEvent; import org.briarproject.bramble.api.sync.event.MessagesAckedEvent;
import org.briarproject.bramble.api.sync.event.MessagesSentEvent; import org.briarproject.bramble.api.sync.event.MessagesSentEvent;
import org.briarproject.bramble.api.sync.event.MessagesToAckEvent;
import org.briarproject.bramble.api.sync.event.MessagesToRequestEvent;
import org.briarproject.bramble.api.transport.IncomingKeys; import org.briarproject.bramble.api.transport.IncomingKeys;
import org.briarproject.bramble.api.transport.KeySetId; import org.briarproject.bramble.api.transport.KeySetId;
import org.briarproject.bramble.api.transport.OutgoingKeys; import org.briarproject.bramble.api.transport.OutgoingKeys;
@@ -76,7 +76,6 @@ import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_LENGTH
import static org.briarproject.bramble.api.sync.validation.MessageState.DELIVERED; import static org.briarproject.bramble.api.sync.validation.MessageState.DELIVERED;
import static org.briarproject.bramble.api.sync.validation.MessageState.UNKNOWN; import static org.briarproject.bramble.api.sync.validation.MessageState.UNKNOWN;
import static org.briarproject.bramble.api.transport.TransportConstants.REORDERING_WINDOW_SIZE; import static org.briarproject.bramble.api.transport.TransportConstants.REORDERING_WINDOW_SIZE;
import static org.briarproject.bramble.db.DatabaseConstants.MAX_OFFERED_MESSAGES;
import static org.briarproject.bramble.test.TestUtils.getAgreementPrivateKey; import static org.briarproject.bramble.test.TestUtils.getAgreementPrivateKey;
import static org.briarproject.bramble.test.TestUtils.getAgreementPublicKey; import static org.briarproject.bramble.test.TestUtils.getAgreementPublicKey;
import static org.briarproject.bramble.test.TestUtils.getAuthor; import static org.briarproject.bramble.test.TestUtils.getAuthor;
@@ -295,11 +294,11 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
throws Exception { throws Exception {
context.checking(new Expectations() {{ context.checking(new Expectations() {{
// Check whether the contact is in the DB (which it's not) // Check whether the contact is in the DB (which it's not)
exactly(18).of(database).startTransaction(); exactly(17).of(database).startTransaction();
will(returnValue(txn)); will(returnValue(txn));
exactly(18).of(database).containsContact(txn, contactId); exactly(17).of(database).containsContact(txn, contactId);
will(returnValue(false)); will(returnValue(false));
exactly(18).of(database).abortTransaction(txn); exactly(17).of(database).abortTransaction(txn);
}}); }});
DatabaseComponent db = createDatabaseComponent(database, eventBus, DatabaseComponent db = createDatabaseComponent(database, eventBus,
eventExecutor, shutdownManager); eventExecutor, shutdownManager);
@@ -337,14 +336,6 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
// Expected // Expected
} }
try {
db.transaction(false, transaction ->
db.generateRequest(transaction, contactId, 123));
fail();
} catch (NoSuchContactException expected) {
// Expected
}
try { try {
db.transaction(false, transaction -> db.transaction(false, transaction ->
db.getContact(transaction, contactId)); db.getContact(transaction, contactId));
@@ -917,30 +908,6 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
}); });
} }
@Test
public void testGenerateRequest() throws Exception {
MessageId messageId1 = new MessageId(getRandomId());
Collection<MessageId> ids = asList(messageId, messageId1);
context.checking(new Expectations() {{
oneOf(database).startTransaction();
will(returnValue(txn));
oneOf(database).containsContact(txn, contactId);
will(returnValue(true));
oneOf(database).getMessagesToRequest(txn, contactId, 123);
will(returnValue(ids));
oneOf(database).removeOfferedMessages(txn, contactId, ids);
oneOf(database).commitTransaction(txn);
}});
DatabaseComponent db = createDatabaseComponent(database, eventBus,
eventExecutor, shutdownManager);
db.transaction(false, transaction -> {
Request r = db.generateRequest(transaction, contactId, 123);
assertNotNull(r);
assertEquals(ids, r.getMessageIds());
});
}
@Test @Test
public void testGenerateRequestedBatch() throws Exception { public void testGenerateRequestedBatch() throws Exception {
Collection<MessageId> ids = asList(messageId, messageId1); Collection<MessageId> ids = asList(messageId, messageId1);
@@ -1020,10 +987,10 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
oneOf(database).raiseAckFlag(txn, contactId, messageId); oneOf(database).raiseAckFlag(txn, contactId, messageId);
oneOf(database).commitTransaction(txn); oneOf(database).commitTransaction(txn);
// First time: the message was received and added // First time: the message was received and added
oneOf(eventBus).broadcast(with(any(MessageToAckEvent.class))); oneOf(eventBus).broadcast(with(any(MessagesToAckEvent.class)));
oneOf(eventBus).broadcast(with(any(MessageAddedEvent.class))); oneOf(eventBus).broadcast(with(any(MessageAddedEvent.class)));
// Second time: the message needs to be acked // Second time: the message needs to be acked
oneOf(eventBus).broadcast(with(any(MessageToAckEvent.class))); oneOf(eventBus).broadcast(with(any(MessagesToAckEvent.class)));
}}); }});
DatabaseComponent db = createDatabaseComponent(database, eventBus, DatabaseComponent db = createDatabaseComponent(database, eventBus,
eventExecutor, shutdownManager); eventExecutor, shutdownManager);
@@ -1051,7 +1018,7 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
oneOf(database).raiseAckFlag(txn, contactId, messageId); oneOf(database).raiseAckFlag(txn, contactId, messageId);
oneOf(database).commitTransaction(txn); oneOf(database).commitTransaction(txn);
// The message was received but not added // The message was received but not added
oneOf(eventBus).broadcast(with(any(MessageToAckEvent.class))); oneOf(eventBus).broadcast(with(any(MessagesToAckEvent.class)));
}}); }});
DatabaseComponent db = createDatabaseComponent(database, eventBus, DatabaseComponent db = createDatabaseComponent(database, eventBus,
eventExecutor, shutdownManager); eventExecutor, shutdownManager);
@@ -1082,19 +1049,15 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
public void testReceiveOffer() throws Exception { public void testReceiveOffer() throws Exception {
MessageId messageId1 = new MessageId(getRandomId()); MessageId messageId1 = new MessageId(getRandomId());
MessageId messageId2 = new MessageId(getRandomId()); MessageId messageId2 = new MessageId(getRandomId());
MessageId messageId3 = new MessageId(getRandomId());
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(database).startTransaction(); oneOf(database).startTransaction();
will(returnValue(txn)); will(returnValue(txn));
oneOf(database).containsContact(txn, contactId); oneOf(database).containsContact(txn, contactId);
will(returnValue(true)); will(returnValue(true));
// There's room for two more offered messages
oneOf(database).countOfferedMessages(txn, contactId);
will(returnValue(MAX_OFFERED_MESSAGES - 2));
// The first message isn't visible - request it // The first message isn't visible - request it
oneOf(database).containsVisibleMessage(txn, contactId, messageId); oneOf(database).containsVisibleMessage(txn, contactId, messageId);
will(returnValue(false)); will(returnValue(false));
oneOf(database).addOfferedMessage(txn, contactId, messageId);
// The second message is visible - ack it // The second message is visible - ack it
oneOf(database).containsVisibleMessage(txn, contactId, messageId1); oneOf(database).containsVisibleMessage(txn, contactId, messageId1);
will(returnValue(true)); will(returnValue(true));
@@ -1103,19 +1066,14 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
// The third message isn't visible - request it // The third message isn't visible - request it
oneOf(database).containsVisibleMessage(txn, contactId, messageId2); oneOf(database).containsVisibleMessage(txn, contactId, messageId2);
will(returnValue(false)); will(returnValue(false));
oneOf(database).addOfferedMessage(txn, contactId, messageId2);
// The fourth message isn't visible, but there's no room to store it
oneOf(database).containsVisibleMessage(txn, contactId, messageId3);
will(returnValue(false));
oneOf(database).commitTransaction(txn); oneOf(database).commitTransaction(txn);
oneOf(eventBus).broadcast(with(any(MessageToAckEvent.class))); oneOf(eventBus).broadcast(with(any(MessagesToAckEvent.class)));
oneOf(eventBus).broadcast(with(any(MessageToRequestEvent.class))); oneOf(eventBus).broadcast(with(any(MessagesToRequestEvent.class)));
}}); }});
DatabaseComponent db = createDatabaseComponent(database, eventBus, DatabaseComponent db = createDatabaseComponent(database, eventBus,
eventExecutor, shutdownManager); eventExecutor, shutdownManager);
Offer o = new Offer(asList(messageId, messageId1, Offer o = new Offer(asList(messageId, messageId1, messageId2));
messageId2, messageId3));
db.transaction(false, transaction -> db.transaction(false, transaction ->
db.receiveOffer(transaction, contactId, o)); db.receiveOffer(transaction, contactId, o));
} }

View File

@@ -41,7 +41,6 @@ import static org.briarproject.bramble.test.TestUtils.getGroup;
import static org.briarproject.bramble.test.TestUtils.getIdentity; import static org.briarproject.bramble.test.TestUtils.getIdentity;
import static org.briarproject.bramble.test.TestUtils.getMessage; import static org.briarproject.bramble.test.TestUtils.getMessage;
import static org.briarproject.bramble.test.TestUtils.getRandomBytes; import static org.briarproject.bramble.test.TestUtils.getRandomBytes;
import static org.briarproject.bramble.test.TestUtils.getRandomId;
import static org.briarproject.bramble.test.TestUtils.getTestDirectory; import static org.briarproject.bramble.test.TestUtils.getTestDirectory;
import static org.briarproject.bramble.test.UTest.Result.INCONCLUSIVE; import static org.briarproject.bramble.test.UTest.Result.INCONCLUSIVE;
import static org.briarproject.bramble.test.UTest.Z_CRITICAL_0_1; import static org.briarproject.bramble.test.UTest.Z_CRITICAL_0_1;
@@ -81,7 +80,6 @@ public abstract class DatabasePerformanceTest extends BrambleTestCase {
private static final int METADATA_KEYS_PER_MESSAGE = 5; private static final int METADATA_KEYS_PER_MESSAGE = 5;
private static final int METADATA_KEY_LENGTH = 10; private static final int METADATA_KEY_LENGTH = 10;
private static final int METADATA_VALUE_LENGTH = 100; private static final int METADATA_VALUE_LENGTH = 100;
private static final int OFFERED_MESSAGES_PER_CONTACT = 100;
/** /**
* How many benchmark iterations to run in each block. * How many benchmark iterations to run in each block.
@@ -192,16 +190,6 @@ public abstract class DatabasePerformanceTest extends BrambleTestCase {
}); });
} }
@Test
public void testCountOfferedMessages() throws Exception {
String name = "countOfferedMessages(T, ContactId)";
benchmark(name, db -> {
Connection txn = db.startTransaction();
db.countOfferedMessages(txn, pickRandom(contacts).getId());
db.commitTransaction(txn);
});
}
@Test @Test
public void testGetContact() throws Exception { public void testGetContact() throws Exception {
String name = "getContact(T, ContactId)"; String name = "getContact(T, ContactId)";
@@ -454,17 +442,6 @@ public abstract class DatabasePerformanceTest extends BrambleTestCase {
}); });
} }
@Test
public void testGetMessagesToRequest() throws Exception {
String name = "getMessagesToRequest(T, ContactId, int)";
benchmark(name, db -> {
Connection txn = db.startTransaction();
db.getMessagesToRequest(txn, pickRandom(contacts).getId(),
MAX_MESSAGE_IDS);
db.commitTransaction(txn);
});
}
@Test @Test
public void testGetMessagesToSend() throws Exception { public void testGetMessagesToSend() throws Exception {
String name = "getMessagesToSend(T, ContactId, int)"; String name = "getMessagesToSend(T, ContactId, int)";
@@ -583,9 +560,6 @@ public abstract class DatabasePerformanceTest extends BrambleTestCase {
groupMessages.get(g.getId()).add(m.getId()); groupMessages.get(g.getId()).add(m.getId());
} }
} }
for (int j = 0; j < OFFERED_MESSAGES_PER_CONTACT; j++) {
db.addOfferedMessage(txn, c, new MessageId(getRandomId()));
}
} }
for (int i = 0; i < LOCAL_GROUPS; i++) { for (int i = 0; i < LOCAL_GROUPS; i++) {
Group g = getGroup(clientIds.get(i % CLIENTS), 123); Group g = getGroup(clientIds.get(i % CLIENTS), 123);

View File

@@ -41,7 +41,6 @@ import org.junit.Test;
import java.io.File; import java.io.File;
import java.sql.Connection; import java.sql.Connection;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -1187,35 +1186,6 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
db.close(); db.close();
} }
@Test
public void testOfferedMessages() throws Exception {
Database<Connection> db = open(false);
Connection txn = db.startTransaction();
// Add a contact - initially there should be no offered messages
db.addIdentity(txn, identity);
assertEquals(contactId,
db.addContact(txn, author, localAuthor.getId(), null, true));
assertEquals(0, db.countOfferedMessages(txn, contactId));
// Add some offered messages and count them
List<MessageId> ids = new ArrayList<>();
for (int i = 0; i < 10; i++) {
MessageId m = new MessageId(getRandomId());
db.addOfferedMessage(txn, contactId, m);
ids.add(m);
}
assertEquals(10, db.countOfferedMessages(txn, contactId));
// Remove some of the offered messages and count again
List<MessageId> half = ids.subList(0, 5);
db.removeOfferedMessages(txn, contactId, half);
assertEquals(5, db.countOfferedMessages(txn, contactId));
db.commitTransaction(txn);
db.close();
}
@Test @Test
public void testGroupMetadata() throws Exception { public void testGroupMetadata() throws Exception {
Database<Connection> db = open(false); Database<Connection> db = open(false);