mirror of
https://code.briarproject.org/briar/briar.git
synced 2026-02-12 10:49:06 +01:00
Merge branch '1587-version-negotiation' into 'master'
Add version negotiation to sync protocol Closes #1587 See merge request briar/briar!1134
This commit is contained in:
@@ -29,6 +29,7 @@ import org.briarproject.bramble.api.transport.TransportKeySet;
|
||||
import org.briarproject.bramble.api.transport.TransportKeys;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
@@ -427,6 +428,13 @@ public interface DatabaseComponent extends TransactionManager {
|
||||
*/
|
||||
Settings getSettings(Transaction txn, String namespace) throws DbException;
|
||||
|
||||
/**
|
||||
* Returns the versions of the sync protocol supported by the given contact.
|
||||
* <p/>
|
||||
* Read-only.
|
||||
*/
|
||||
List<Byte> getSyncVersions(Transaction txn, ContactId c) throws DbException;
|
||||
|
||||
/**
|
||||
* Returns all transport keys for the given transport.
|
||||
* <p/>
|
||||
@@ -579,6 +587,12 @@ public interface DatabaseComponent extends TransactionManager {
|
||||
void setReorderingWindow(Transaction txn, KeySetId k, TransportId t,
|
||||
long timePeriod, long base, byte[] bitmap) throws DbException;
|
||||
|
||||
/**
|
||||
* Sets the versions of the sync protocol supported by the given contact.
|
||||
*/
|
||||
void setSyncVersions(Transaction txn, ContactId c, List<Byte> supported)
|
||||
throws DbException;
|
||||
|
||||
/**
|
||||
* Marks the given transport keys as usable for outgoing streams.
|
||||
*/
|
||||
|
||||
@@ -1,10 +1,16 @@
|
||||
package org.briarproject.bramble.api.sync;
|
||||
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import javax.annotation.concurrent.Immutable;
|
||||
|
||||
/**
|
||||
* A record acknowledging receipt of one or more {@link Message Messages}.
|
||||
*/
|
||||
@Immutable
|
||||
@NotNullByDefault
|
||||
public class Ack {
|
||||
|
||||
private final Collection<MessageId> acked;
|
||||
|
||||
@@ -1,8 +1,14 @@
|
||||
package org.briarproject.bramble.api.sync;
|
||||
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
|
||||
import javax.annotation.concurrent.Immutable;
|
||||
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_BODY_LENGTH;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
|
||||
|
||||
@Immutable
|
||||
@NotNullByDefault
|
||||
public class Message {
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,10 +1,16 @@
|
||||
package org.briarproject.bramble.api.sync;
|
||||
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import javax.annotation.concurrent.Immutable;
|
||||
|
||||
/**
|
||||
* A record offering the recipient one or more {@link Message Messages}.
|
||||
*/
|
||||
@Immutable
|
||||
@NotNullByDefault
|
||||
public class Offer {
|
||||
|
||||
private final Collection<MessageId> offered;
|
||||
|
||||
@@ -9,5 +9,5 @@ public interface RecordTypes {
|
||||
byte MESSAGE = 1;
|
||||
byte OFFER = 2;
|
||||
byte REQUEST = 3;
|
||||
|
||||
byte VERSIONS = 4;
|
||||
}
|
||||
|
||||
@@ -1,10 +1,16 @@
|
||||
package org.briarproject.bramble.api.sync;
|
||||
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import javax.annotation.concurrent.Immutable;
|
||||
|
||||
/**
|
||||
* A record requesting one or more {@link Message Messages} from the recipient.
|
||||
*/
|
||||
@Immutable
|
||||
@NotNullByDefault
|
||||
public class Request {
|
||||
|
||||
private final Collection<MessageId> requested;
|
||||
|
||||
@@ -2,6 +2,9 @@ package org.briarproject.bramble.api.sync;
|
||||
|
||||
import org.briarproject.bramble.api.UniqueId;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static java.util.Collections.singletonList;
|
||||
import static org.briarproject.bramble.api.record.Record.MAX_RECORD_PAYLOAD_BYTES;
|
||||
|
||||
public interface SyncConstants {
|
||||
@@ -11,6 +14,11 @@ public interface SyncConstants {
|
||||
*/
|
||||
byte PROTOCOL_VERSION = 0;
|
||||
|
||||
/**
|
||||
* The versions of the sync protocol this peer supports.
|
||||
*/
|
||||
List<Byte> SUPPORTED_VERSIONS = singletonList(PROTOCOL_VERSION);
|
||||
|
||||
/**
|
||||
* The maximum length of a group descriptor in bytes.
|
||||
*/
|
||||
@@ -35,4 +43,10 @@ public interface SyncConstants {
|
||||
* The maximum number of message IDs in an ack, offer or request record.
|
||||
*/
|
||||
int MAX_MESSAGE_IDS = MAX_RECORD_PAYLOAD_BYTES / UniqueId.LENGTH;
|
||||
|
||||
/**
|
||||
* The maximum number of versions of the sync protocol a peer may support
|
||||
* simultaneously.
|
||||
*/
|
||||
int MAX_SUPPORTED_VERSIONS = 10;
|
||||
}
|
||||
|
||||
@@ -25,4 +25,7 @@ public interface SyncRecordReader {
|
||||
|
||||
Request readRequest() throws IOException;
|
||||
|
||||
boolean hasVersions() throws IOException;
|
||||
|
||||
Versions readVersions() throws IOException;
|
||||
}
|
||||
|
||||
@@ -15,5 +15,7 @@ public interface SyncRecordWriter {
|
||||
|
||||
void writeRequest(Request r) throws IOException;
|
||||
|
||||
void writeVersions(Versions v) throws IOException;
|
||||
|
||||
void flush() throws IOException;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,26 @@
|
||||
package org.briarproject.bramble.api.sync;
|
||||
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import javax.annotation.concurrent.Immutable;
|
||||
|
||||
/**
|
||||
* A record telling the recipient which versions of the sync protocol the
|
||||
* sender supports.
|
||||
*/
|
||||
@Immutable
|
||||
@NotNullByDefault
|
||||
public class Versions {
|
||||
|
||||
private final List<Byte> supported;
|
||||
|
||||
public Versions(List<Byte> supported) {
|
||||
this.supported = supported;
|
||||
}
|
||||
|
||||
public List<Byte> getSupportedVersions() {
|
||||
return supported;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
package org.briarproject.bramble.api.sync.event;
|
||||
|
||||
import org.briarproject.bramble.api.contact.ContactId;
|
||||
import org.briarproject.bramble.api.event.Event;
|
||||
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import javax.annotation.concurrent.Immutable;
|
||||
|
||||
/**
|
||||
* An event that is broadcast when the versions of the sync protocol supported
|
||||
* by a contact are updated.
|
||||
*/
|
||||
@Immutable
|
||||
@NotNullByDefault
|
||||
public class SyncVersionsUpdatedEvent extends Event {
|
||||
|
||||
private final ContactId contactId;
|
||||
private final List<Byte> supported;
|
||||
|
||||
public SyncVersionsUpdatedEvent(ContactId contactId, List<Byte> supported) {
|
||||
this.contactId = contactId;
|
||||
this.supported = supported;
|
||||
}
|
||||
|
||||
public ContactId getContactId() {
|
||||
return contactId;
|
||||
}
|
||||
|
||||
public List<Byte> getSupportedVersions() {
|
||||
return supported;
|
||||
}
|
||||
}
|
||||
@@ -33,6 +33,7 @@ import org.briarproject.bramble.api.transport.TransportKeySet;
|
||||
import org.briarproject.bramble.api.transport.TransportKeys;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
@@ -528,6 +529,13 @@ interface Database<T> {
|
||||
*/
|
||||
Settings getSettings(T txn, String namespace) throws DbException;
|
||||
|
||||
/**
|
||||
* Returns the versions of the sync protocol supported by the given contact.
|
||||
* <p/>
|
||||
* Read-only.
|
||||
*/
|
||||
List<Byte> getSyncVersions(T txn, ContactId c) throws DbException;
|
||||
|
||||
/**
|
||||
* Returns all transport keys for the given transport.
|
||||
* <p/>
|
||||
@@ -700,6 +708,12 @@ interface Database<T> {
|
||||
void setReorderingWindow(T txn, KeySetId k, TransportId t,
|
||||
long timePeriod, long base, byte[] bitmap) throws DbException;
|
||||
|
||||
/**
|
||||
* Sets the versions of the sync protocol supported by the given contact.
|
||||
*/
|
||||
void setSyncVersions(T txn, ContactId c, List<Byte> supported)
|
||||
throws DbException;
|
||||
|
||||
/**
|
||||
* Marks the given transport keys as usable for outgoing streams.
|
||||
*/
|
||||
|
||||
@@ -65,6 +65,7 @@ import org.briarproject.bramble.api.sync.event.MessageToAckEvent;
|
||||
import org.briarproject.bramble.api.sync.event.MessageToRequestEvent;
|
||||
import org.briarproject.bramble.api.sync.event.MessagesAckedEvent;
|
||||
import org.briarproject.bramble.api.sync.event.MessagesSentEvent;
|
||||
import org.briarproject.bramble.api.sync.event.SyncVersionsUpdatedEvent;
|
||||
import org.briarproject.bramble.api.sync.validation.MessageState;
|
||||
import org.briarproject.bramble.api.transport.KeySetId;
|
||||
import org.briarproject.bramble.api.transport.TransportKeySet;
|
||||
@@ -716,6 +717,15 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
|
||||
return db.getSettings(txn, namespace);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Byte> getSyncVersions(Transaction transaction, ContactId c)
|
||||
throws DbException {
|
||||
T txn = unbox(transaction);
|
||||
if (!db.containsContact(txn, c))
|
||||
throw new NoSuchContactException();
|
||||
return db.getSyncVersions(txn, c);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<TransportKeySet> getTransportKeys(Transaction transaction,
|
||||
TransportId t) throws DbException {
|
||||
@@ -1046,6 +1056,17 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
|
||||
db.setReorderingWindow(txn, k, t, timePeriod, base, bitmap);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setSyncVersions(Transaction transaction, ContactId c,
|
||||
List<Byte> supported) throws DbException {
|
||||
if (transaction.isReadOnly()) throw new IllegalArgumentException();
|
||||
T txn = unbox(transaction);
|
||||
if (!db.containsContact(txn, c))
|
||||
throw new NoSuchContactException();
|
||||
db.setSyncVersions(txn, c, supported);
|
||||
transaction.attach(new SyncVersionsUpdatedEvent(c, supported));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTransportKeysActive(Transaction transaction, TransportId t,
|
||||
KeySetId k) throws DbException {
|
||||
|
||||
@@ -98,7 +98,7 @@ import static org.briarproject.bramble.util.LogUtils.now;
|
||||
abstract class JdbcDatabase implements Database<Connection> {
|
||||
|
||||
// Package access for testing
|
||||
static final int CODE_SCHEMA_VERSION = 46;
|
||||
static final int CODE_SCHEMA_VERSION = 47;
|
||||
|
||||
// Time period offsets for incoming transport keys
|
||||
private static final int OFFSET_PREV = -1;
|
||||
@@ -135,6 +135,7 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
+ " handshakePublicKey _BINARY," // Null if key is unknown
|
||||
+ " localAuthorId _HASH NOT NULL,"
|
||||
+ " verified BOOLEAN NOT NULL,"
|
||||
+ " syncVersions _BINARY DEFAULT '00' NOT NULL,"
|
||||
+ " PRIMARY KEY (contactId),"
|
||||
+ " FOREIGN KEY (localAuthorId)"
|
||||
+ " REFERENCES localAuthors (authorId)"
|
||||
@@ -461,7 +462,8 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
new Migration42_43(dbTypes),
|
||||
new Migration43_44(dbTypes),
|
||||
new Migration44_45(),
|
||||
new Migration45_46()
|
||||
new Migration45_46(),
|
||||
new Migration46_47(dbTypes)
|
||||
);
|
||||
}
|
||||
|
||||
@@ -2328,6 +2330,32 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Byte> getSyncVersions(Connection txn, ContactId c)
|
||||
throws DbException {
|
||||
PreparedStatement ps = null;
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
String sql = "SELECT syncVersions FROM contacts"
|
||||
+ " WHERE contactId = ?";
|
||||
ps = txn.prepareStatement(sql);
|
||||
ps.setInt(1, c.getInt());
|
||||
rs = ps.executeQuery();
|
||||
if (!rs.next()) throw new DbStateException();
|
||||
byte[] bytes = rs.getBytes(1);
|
||||
List<Byte> supported = new ArrayList<>(bytes.length);
|
||||
for (byte b : bytes) supported.add(b);
|
||||
if (rs.next()) throw new DbStateException();
|
||||
rs.close();
|
||||
ps.close();
|
||||
return supported;
|
||||
} catch (SQLException e) {
|
||||
tryToClose(rs, LOG, WARNING);
|
||||
tryToClose(ps, LOG, WARNING);
|
||||
throw new DbException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<TransportKeySet> getTransportKeys(Connection txn,
|
||||
TransportId t) throws DbException {
|
||||
@@ -3161,6 +3189,29 @@ abstract class JdbcDatabase implements Database<Connection> {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setSyncVersions(Connection txn, ContactId c,
|
||||
List<Byte> supported) throws DbException {
|
||||
PreparedStatement ps = null;
|
||||
try {
|
||||
String sql = "UPDATE contacts SET syncVersions = ?"
|
||||
+ " WHERE contactId = ?";
|
||||
ps = txn.prepareStatement(sql);
|
||||
byte[] bytes = new byte[supported.size()];
|
||||
for (int i = 0; i < bytes.length; i++) {
|
||||
bytes[i] = supported.get(i);
|
||||
}
|
||||
ps.setBytes(1, bytes);
|
||||
ps.setInt(2, c.getInt());
|
||||
int affected = ps.executeUpdate();
|
||||
if (affected < 0 || affected > 1) throw new DbStateException();
|
||||
ps.close();
|
||||
} catch (SQLException e) {
|
||||
tryToClose(ps, LOG, WARNING);
|
||||
throw new DbException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTransportKeysActive(Connection txn, TransportId t,
|
||||
KeySetId k) throws DbException {
|
||||
|
||||
@@ -31,8 +31,7 @@ class Migration45_46 implements Migration<Connection> {
|
||||
try {
|
||||
s = txn.createStatement();
|
||||
s.execute("ALTER TABLE messages"
|
||||
+ " ADD COLUMN temporary BOOLEAN NOT NULL"
|
||||
+ " DEFAULT (FALSE)");
|
||||
+ " ADD COLUMN temporary BOOLEAN DEFAULT FALSE NOT NULL");
|
||||
} catch (SQLException e) {
|
||||
tryToClose(s, LOG, WARNING);
|
||||
throw new DbException(e);
|
||||
|
||||
@@ -0,0 +1,47 @@
|
||||
package org.briarproject.bramble.db;
|
||||
|
||||
import org.briarproject.bramble.api.db.DbException;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import static java.util.logging.Level.WARNING;
|
||||
import static java.util.logging.Logger.getLogger;
|
||||
import static org.briarproject.bramble.db.JdbcUtils.tryToClose;
|
||||
|
||||
class Migration46_47 implements Migration<Connection> {
|
||||
|
||||
private static final Logger LOG = getLogger(Migration46_47.class.getName());
|
||||
|
||||
private final DatabaseTypes dbTypes;
|
||||
|
||||
Migration46_47(DatabaseTypes dbTypes) {
|
||||
this.dbTypes = dbTypes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getStartVersion() {
|
||||
return 46;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getEndVersion() {
|
||||
return 47;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void migrate(Connection txn) throws DbException {
|
||||
Statement s = null;
|
||||
try {
|
||||
s = txn.createStatement();
|
||||
s.execute(dbTypes.replaceTypes("ALTER TABLE contacts"
|
||||
+ " ADD COLUMN syncVersions"
|
||||
+ " _BINARY DEFAULT '00' NOT NULL"));
|
||||
} catch (SQLException e) {
|
||||
tryToClose(s, LOG, WARNING);
|
||||
throw new DbException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -17,6 +17,7 @@ import org.briarproject.bramble.api.sync.Offer;
|
||||
import org.briarproject.bramble.api.sync.Request;
|
||||
import org.briarproject.bramble.api.sync.SyncRecordWriter;
|
||||
import org.briarproject.bramble.api.sync.SyncSession;
|
||||
import org.briarproject.bramble.api.sync.Versions;
|
||||
import org.briarproject.bramble.api.sync.event.GroupVisibilityUpdatedEvent;
|
||||
import org.briarproject.bramble.api.sync.event.MessageRequestedEvent;
|
||||
import org.briarproject.bramble.api.sync.event.MessageSharedEvent;
|
||||
@@ -39,9 +40,11 @@ import javax.annotation.concurrent.ThreadSafe;
|
||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||
import static java.util.logging.Level.INFO;
|
||||
import static java.util.logging.Level.WARNING;
|
||||
import static java.util.logging.Logger.getLogger;
|
||||
import static org.briarproject.bramble.api.lifecycle.LifecycleManager.LifecycleState.STOPPING;
|
||||
import static org.briarproject.bramble.api.record.Record.MAX_RECORD_PAYLOAD_BYTES;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.SUPPORTED_VERSIONS;
|
||||
import static org.briarproject.bramble.util.LogUtils.logException;
|
||||
|
||||
/**
|
||||
@@ -55,7 +58,7 @@ import static org.briarproject.bramble.util.LogUtils.logException;
|
||||
class DuplexOutgoingSession implements SyncSession, EventListener {
|
||||
|
||||
private static final Logger LOG =
|
||||
Logger.getLogger(DuplexOutgoingSession.class.getName());
|
||||
getLogger(DuplexOutgoingSession.class.getName());
|
||||
|
||||
private static final ThrowingRunnable<IOException> CLOSE = () -> {
|
||||
};
|
||||
@@ -103,6 +106,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
|
||||
public void run() throws IOException {
|
||||
eventBus.addListener(this);
|
||||
try {
|
||||
// Send our supported protocol versions
|
||||
recordWriter.writeVersions(new Versions(SUPPORTED_VERSIONS));
|
||||
// Start a query for each type of record
|
||||
generateAck();
|
||||
generateBatch();
|
||||
|
||||
@@ -18,14 +18,17 @@ import org.briarproject.bramble.api.sync.Offer;
|
||||
import org.briarproject.bramble.api.sync.Request;
|
||||
import org.briarproject.bramble.api.sync.SyncRecordReader;
|
||||
import org.briarproject.bramble.api.sync.SyncSession;
|
||||
import org.briarproject.bramble.api.sync.Versions;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import javax.annotation.concurrent.ThreadSafe;
|
||||
|
||||
import static java.util.logging.Level.WARNING;
|
||||
import static java.util.logging.Logger.getLogger;
|
||||
import static org.briarproject.bramble.api.lifecycle.LifecycleManager.LifecycleState.STOPPING;
|
||||
import static org.briarproject.bramble.util.LogUtils.logException;
|
||||
|
||||
@@ -37,7 +40,7 @@ import static org.briarproject.bramble.util.LogUtils.logException;
|
||||
class IncomingSession implements SyncSession, EventListener {
|
||||
|
||||
private static final Logger LOG =
|
||||
Logger.getLogger(IncomingSession.class.getName());
|
||||
getLogger(IncomingSession.class.getName());
|
||||
|
||||
private final DatabaseComponent db;
|
||||
private final Executor dbExecutor;
|
||||
@@ -80,6 +83,9 @@ class IncomingSession implements SyncSession, EventListener {
|
||||
} else if (recordReader.hasRequest()) {
|
||||
Request r = recordReader.readRequest();
|
||||
dbExecutor.execute(new ReceiveRequest(r));
|
||||
} else if (recordReader.hasVersions()) {
|
||||
Versions v = recordReader.readVersions();
|
||||
dbExecutor.execute(new ReceiveVersions(v));
|
||||
} else {
|
||||
// unknown records are ignored in RecordReader#eof()
|
||||
throw new FormatException();
|
||||
@@ -190,4 +196,26 @@ class IncomingSession implements SyncSession, EventListener {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class ReceiveVersions implements Runnable {
|
||||
|
||||
private final Versions versions;
|
||||
|
||||
private ReceiveVersions(Versions versions) {
|
||||
this.versions = versions;
|
||||
}
|
||||
|
||||
@DatabaseExecutor
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
List<Byte> supported = versions.getSupportedVersions();
|
||||
db.transaction(false,
|
||||
txn -> db.setSyncVersions(txn, contactId, supported));
|
||||
} catch (DbException e) {
|
||||
logException(LOG, WARNING, e);
|
||||
interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ import org.briarproject.bramble.api.sync.Ack;
|
||||
import org.briarproject.bramble.api.sync.Message;
|
||||
import org.briarproject.bramble.api.sync.SyncRecordWriter;
|
||||
import org.briarproject.bramble.api.sync.SyncSession;
|
||||
import org.briarproject.bramble.api.sync.Versions;
|
||||
import org.briarproject.bramble.api.transport.StreamWriter;
|
||||
|
||||
import java.io.IOException;
|
||||
@@ -29,9 +30,11 @@ import javax.annotation.concurrent.ThreadSafe;
|
||||
|
||||
import static java.util.logging.Level.INFO;
|
||||
import static java.util.logging.Level.WARNING;
|
||||
import static java.util.logging.Logger.getLogger;
|
||||
import static org.briarproject.bramble.api.lifecycle.LifecycleManager.LifecycleState.STOPPING;
|
||||
import static org.briarproject.bramble.api.record.Record.MAX_RECORD_PAYLOAD_BYTES;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.SUPPORTED_VERSIONS;
|
||||
import static org.briarproject.bramble.util.LogUtils.logException;
|
||||
|
||||
/**
|
||||
@@ -44,7 +47,7 @@ import static org.briarproject.bramble.util.LogUtils.logException;
|
||||
class SimplexOutgoingSession implements SyncSession, EventListener {
|
||||
|
||||
private static final Logger LOG =
|
||||
Logger.getLogger(SimplexOutgoingSession.class.getName());
|
||||
getLogger(SimplexOutgoingSession.class.getName());
|
||||
|
||||
private static final ThrowingRunnable<IOException> CLOSE = () -> {
|
||||
};
|
||||
@@ -80,6 +83,8 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
|
||||
public void run() throws IOException {
|
||||
eventBus.addListener(this);
|
||||
try {
|
||||
// Send our supported protocol versions
|
||||
recordWriter.writeVersions(new Versions(SUPPORTED_VERSIONS));
|
||||
// Start a query for each type of record
|
||||
dbExecutor.execute(new GenerateAck());
|
||||
dbExecutor.execute(new GenerateBatch());
|
||||
|
||||
@@ -13,6 +13,7 @@ import org.briarproject.bramble.api.sync.MessageId;
|
||||
import org.briarproject.bramble.api.sync.Offer;
|
||||
import org.briarproject.bramble.api.sync.Request;
|
||||
import org.briarproject.bramble.api.sync.SyncRecordReader;
|
||||
import org.briarproject.bramble.api.sync.Versions;
|
||||
import org.briarproject.bramble.util.ByteUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
@@ -26,6 +27,8 @@ import static org.briarproject.bramble.api.sync.RecordTypes.ACK;
|
||||
import static org.briarproject.bramble.api.sync.RecordTypes.MESSAGE;
|
||||
import static org.briarproject.bramble.api.sync.RecordTypes.OFFER;
|
||||
import static org.briarproject.bramble.api.sync.RecordTypes.REQUEST;
|
||||
import static org.briarproject.bramble.api.sync.RecordTypes.VERSIONS;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_SUPPORTED_VERSIONS;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.PROTOCOL_VERSION;
|
||||
|
||||
@@ -45,7 +48,7 @@ class SyncRecordReaderImpl implements SyncRecordReader {
|
||||
|
||||
private static boolean isKnownRecordType(byte type) {
|
||||
return type == ACK || type == MESSAGE || type == OFFER ||
|
||||
type == REQUEST;
|
||||
type == REQUEST || type == VERSIONS;
|
||||
}
|
||||
|
||||
private final MessageFactory messageFactory;
|
||||
@@ -148,4 +151,27 @@ class SyncRecordReaderImpl implements SyncRecordReader {
|
||||
if (!hasRequest()) throw new FormatException();
|
||||
return new Request(readMessageIds());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasVersions() throws IOException {
|
||||
return !eof() && getNextRecordType() == VERSIONS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Versions readVersions() throws IOException {
|
||||
if (!hasVersions()) throw new FormatException();
|
||||
return new Versions(readSupportedVersions());
|
||||
}
|
||||
|
||||
private List<Byte> readSupportedVersions() throws IOException {
|
||||
if (nextRecord == null) throw new AssertionError();
|
||||
byte[] payload = nextRecord.getPayload();
|
||||
if (payload.length == 0) throw new FormatException();
|
||||
if (payload.length > MAX_SUPPORTED_VERSIONS)
|
||||
throw new FormatException();
|
||||
List<Byte> supported = new ArrayList<>(payload.length);
|
||||
for (byte b : payload) supported.add(b);
|
||||
nextRecord = null;
|
||||
return supported;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ import org.briarproject.bramble.api.sync.MessageId;
|
||||
import org.briarproject.bramble.api.sync.Offer;
|
||||
import org.briarproject.bramble.api.sync.Request;
|
||||
import org.briarproject.bramble.api.sync.SyncRecordWriter;
|
||||
import org.briarproject.bramble.api.sync.Versions;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
@@ -20,6 +21,7 @@ import static org.briarproject.bramble.api.sync.RecordTypes.ACK;
|
||||
import static org.briarproject.bramble.api.sync.RecordTypes.MESSAGE;
|
||||
import static org.briarproject.bramble.api.sync.RecordTypes.OFFER;
|
||||
import static org.briarproject.bramble.api.sync.RecordTypes.REQUEST;
|
||||
import static org.briarproject.bramble.api.sync.RecordTypes.VERSIONS;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.PROTOCOL_VERSION;
|
||||
|
||||
@NotThreadSafe
|
||||
@@ -65,6 +67,12 @@ class SyncRecordWriterImpl implements SyncRecordWriter {
|
||||
writeRecord(REQUEST);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeVersions(Versions v) throws IOException {
|
||||
for (byte b : v.getSupportedVersions()) payload.write(b);
|
||||
writeRecord(VERSIONS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() throws IOException {
|
||||
writer.flush();
|
||||
|
||||
@@ -66,6 +66,7 @@ import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static org.briarproject.bramble.api.sync.Group.Visibility.INVISIBLE;
|
||||
@@ -294,11 +295,11 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
throws Exception {
|
||||
context.checking(new Expectations() {{
|
||||
// Check whether the contact is in the DB (which it's not)
|
||||
exactly(16).of(database).startTransaction();
|
||||
exactly(18).of(database).startTransaction();
|
||||
will(returnValue(txn));
|
||||
exactly(16).of(database).containsContact(txn, contactId);
|
||||
exactly(18).of(database).containsContact(txn, contactId);
|
||||
will(returnValue(false));
|
||||
exactly(16).of(database).abortTransaction(txn);
|
||||
exactly(18).of(database).abortTransaction(txn);
|
||||
}});
|
||||
DatabaseComponent db = createDatabaseComponent(database, eventBus,
|
||||
eventExecutor, shutdownManager);
|
||||
@@ -376,6 +377,14 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
// Expected
|
||||
}
|
||||
|
||||
try {
|
||||
db.transaction(false, transaction ->
|
||||
db.getSyncVersions(transaction, contactId));
|
||||
fail();
|
||||
} catch (NoSuchContactException expected) {
|
||||
// Expected
|
||||
}
|
||||
|
||||
try {
|
||||
Ack a = new Ack(singletonList(messageId));
|
||||
db.transaction(false, transaction ->
|
||||
@@ -435,6 +444,14 @@ public class DatabaseComponentImplTest extends BrambleMockTestCase {
|
||||
} catch (NoSuchContactException expected) {
|
||||
// Expected
|
||||
}
|
||||
|
||||
try {
|
||||
db.transaction(false, transaction ->
|
||||
db.setSyncVersions(transaction, contactId, emptyList()));
|
||||
fail();
|
||||
} catch (NoSuchContactException expected) {
|
||||
// Expected
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -41,7 +41,6 @@ import org.junit.Test;
|
||||
import java.io.File;
|
||||
import java.sql.Connection;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -51,6 +50,7 @@ import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.singletonList;
|
||||
@@ -407,10 +407,10 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
|
||||
|
||||
// Both message IDs should be returned
|
||||
Collection<MessageId> ids = db.getMessagesToAck(txn, contactId, 1234);
|
||||
assertEquals(Arrays.asList(messageId, messageId1), ids);
|
||||
assertEquals(asList(messageId, messageId1), ids);
|
||||
|
||||
// Remove both message IDs
|
||||
db.lowerAckFlag(txn, contactId, Arrays.asList(messageId, messageId1));
|
||||
db.lowerAckFlag(txn, contactId, asList(messageId, messageId1));
|
||||
|
||||
// Both message IDs should have been removed
|
||||
assertEquals(emptyList(), db.getMessagesToAck(txn,
|
||||
@@ -422,7 +422,7 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
|
||||
|
||||
// Both message IDs should be returned
|
||||
ids = db.getMessagesToAck(txn, contactId, 1234);
|
||||
assertEquals(Arrays.asList(messageId, messageId1), ids);
|
||||
assertEquals(asList(messageId, messageId1), ids);
|
||||
|
||||
db.commitTransaction(txn);
|
||||
db.close();
|
||||
@@ -2286,6 +2286,29 @@ public abstract class JdbcDatabaseTest extends BrambleTestCase {
|
||||
db.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSyncVersions() throws Exception {
|
||||
Database<Connection> db = open(false);
|
||||
Connection txn = db.startTransaction();
|
||||
|
||||
// Add a contact
|
||||
db.addIdentity(txn, identity);
|
||||
assertEquals(contactId,
|
||||
db.addContact(txn, author, localAuthor.getId(), null, true));
|
||||
|
||||
// Only sync version 0 should be supported by default
|
||||
List<Byte> defaultSupported = singletonList((byte) 0);
|
||||
assertEquals(defaultSupported, db.getSyncVersions(txn, contactId));
|
||||
|
||||
// Set the supported versions and check that they're returned
|
||||
List<Byte> supported = asList((byte) 0, (byte) 1);
|
||||
db.setSyncVersions(txn, contactId, supported);
|
||||
assertEquals(supported, db.getSyncVersions(txn, contactId));
|
||||
|
||||
db.commitTransaction(txn);
|
||||
db.close();
|
||||
}
|
||||
|
||||
private Database<Connection> open(boolean resume) throws Exception {
|
||||
return open(resume, new TestMessageFactory(), new SystemClock());
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import org.briarproject.bramble.api.sync.GroupId;
|
||||
import org.briarproject.bramble.api.sync.Message;
|
||||
import org.briarproject.bramble.api.sync.MessageId;
|
||||
import org.briarproject.bramble.api.sync.SyncRecordWriter;
|
||||
import org.briarproject.bramble.api.sync.Versions;
|
||||
import org.briarproject.bramble.api.transport.StreamWriter;
|
||||
import org.briarproject.bramble.test.BrambleMockTestCase;
|
||||
import org.briarproject.bramble.test.DbExpectations;
|
||||
@@ -49,6 +50,8 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
|
||||
context.checking(new DbExpectations() {{
|
||||
// Add listener
|
||||
oneOf(eventBus).addListener(session);
|
||||
// Send the protocol versions
|
||||
oneOf(recordWriter).writeVersions(with(any(Versions.class)));
|
||||
// No acks to send
|
||||
oneOf(db).transactionWithNullableResult(with(false),
|
||||
withNullableDbCallable(noAckTxn));
|
||||
@@ -83,6 +86,8 @@ public class SimplexOutgoingSessionTest extends BrambleMockTestCase {
|
||||
context.checking(new DbExpectations() {{
|
||||
// Add listener
|
||||
oneOf(eventBus).addListener(session);
|
||||
// Send the protocol versions
|
||||
oneOf(recordWriter).writeVersions(with(any(Versions.class)));
|
||||
// One ack to send
|
||||
oneOf(db).transactionWithNullableResult(with(false),
|
||||
withNullableDbCallable(ackTxn));
|
||||
|
||||
@@ -10,11 +10,14 @@ import org.briarproject.bramble.api.sync.MessageFactory;
|
||||
import org.briarproject.bramble.api.sync.Offer;
|
||||
import org.briarproject.bramble.api.sync.Request;
|
||||
import org.briarproject.bramble.api.sync.SyncRecordReader;
|
||||
import org.briarproject.bramble.api.sync.Versions;
|
||||
import org.briarproject.bramble.test.BrambleMockTestCase;
|
||||
import org.jmock.Expectations;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.util.List;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
@@ -22,7 +25,9 @@ import static org.briarproject.bramble.api.record.Record.MAX_RECORD_PAYLOAD_BYTE
|
||||
import static org.briarproject.bramble.api.sync.RecordTypes.ACK;
|
||||
import static org.briarproject.bramble.api.sync.RecordTypes.OFFER;
|
||||
import static org.briarproject.bramble.api.sync.RecordTypes.REQUEST;
|
||||
import static org.briarproject.bramble.api.sync.RecordTypes.VERSIONS;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_SUPPORTED_VERSIONS;
|
||||
import static org.briarproject.bramble.api.sync.SyncConstants.PROTOCOL_VERSION;
|
||||
import static org.briarproject.bramble.test.TestUtils.getRandomId;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
@@ -35,12 +40,17 @@ public class SyncRecordReaderImplTest extends BrambleMockTestCase {
|
||||
context.mock(MessageFactory.class);
|
||||
private final RecordReader recordReader = context.mock(RecordReader.class);
|
||||
|
||||
private SyncRecordReader reader;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
reader = new SyncRecordReaderImpl(messageFactory, recordReader);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoFormatExceptionIfAckIsMaximumSize() throws Exception {
|
||||
expectReadRecord(createAck());
|
||||
|
||||
SyncRecordReader reader =
|
||||
new SyncRecordReaderImpl(messageFactory, recordReader);
|
||||
Ack ack = reader.readAck();
|
||||
assertEquals(MAX_MESSAGE_IDS, ack.getMessageIds().size());
|
||||
}
|
||||
@@ -49,8 +59,6 @@ public class SyncRecordReaderImplTest extends BrambleMockTestCase {
|
||||
public void testFormatExceptionIfAckIsEmpty() throws Exception {
|
||||
expectReadRecord(createEmptyAck());
|
||||
|
||||
SyncRecordReader reader =
|
||||
new SyncRecordReaderImpl(messageFactory, recordReader);
|
||||
reader.readAck();
|
||||
}
|
||||
|
||||
@@ -58,8 +66,6 @@ public class SyncRecordReaderImplTest extends BrambleMockTestCase {
|
||||
public void testNoFormatExceptionIfOfferIsMaximumSize() throws Exception {
|
||||
expectReadRecord(createOffer());
|
||||
|
||||
SyncRecordReader reader =
|
||||
new SyncRecordReaderImpl(messageFactory, recordReader);
|
||||
Offer offer = reader.readOffer();
|
||||
assertEquals(MAX_MESSAGE_IDS, offer.getMessageIds().size());
|
||||
}
|
||||
@@ -68,8 +74,6 @@ public class SyncRecordReaderImplTest extends BrambleMockTestCase {
|
||||
public void testFormatExceptionIfOfferIsEmpty() throws Exception {
|
||||
expectReadRecord(createEmptyOffer());
|
||||
|
||||
SyncRecordReader reader =
|
||||
new SyncRecordReaderImpl(messageFactory, recordReader);
|
||||
reader.readOffer();
|
||||
}
|
||||
|
||||
@@ -77,8 +81,6 @@ public class SyncRecordReaderImplTest extends BrambleMockTestCase {
|
||||
public void testNoFormatExceptionIfRequestIsMaximumSize() throws Exception {
|
||||
expectReadRecord(createRequest());
|
||||
|
||||
SyncRecordReader reader =
|
||||
new SyncRecordReaderImpl(messageFactory, recordReader);
|
||||
Request request = reader.readRequest();
|
||||
assertEquals(MAX_MESSAGE_IDS, request.getMessageIds().size());
|
||||
}
|
||||
@@ -87,11 +89,36 @@ public class SyncRecordReaderImplTest extends BrambleMockTestCase {
|
||||
public void testFormatExceptionIfRequestIsEmpty() throws Exception {
|
||||
expectReadRecord(createEmptyRequest());
|
||||
|
||||
SyncRecordReader reader =
|
||||
new SyncRecordReaderImpl(messageFactory, recordReader);
|
||||
reader.readRequest();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoFormatExceptionIfVersionsIsMaximumSize()
|
||||
throws Exception {
|
||||
expectReadRecord(createVersions(MAX_SUPPORTED_VERSIONS));
|
||||
|
||||
Versions versions = reader.readVersions();
|
||||
List<Byte> supported = versions.getSupportedVersions();
|
||||
assertEquals(MAX_SUPPORTED_VERSIONS, supported.size());
|
||||
for (int i = 0; i < supported.size(); i++) {
|
||||
assertEquals(i, (int) supported.get(i));
|
||||
}
|
||||
}
|
||||
|
||||
@Test(expected = FormatException.class)
|
||||
public void testFormatExceptionIfVersionsIsEmpty() throws Exception {
|
||||
expectReadRecord(createVersions(0));
|
||||
|
||||
reader.readVersions();
|
||||
}
|
||||
|
||||
@Test(expected = FormatException.class)
|
||||
public void testFormatExceptionIfVersionsIsTooLarge() throws Exception {
|
||||
expectReadRecord(createVersions(MAX_SUPPORTED_VERSIONS + 1));
|
||||
|
||||
reader.readVersions();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEofReturnsTrueWhenAtEndOfStream() throws Exception {
|
||||
expectReadRecord(createAck());
|
||||
@@ -140,6 +167,12 @@ public class SyncRecordReaderImplTest extends BrambleMockTestCase {
|
||||
return new Record(PROTOCOL_VERSION, REQUEST, new byte[0]);
|
||||
}
|
||||
|
||||
private Record createVersions(int numVersions) {
|
||||
byte[] payload = new byte[numVersions];
|
||||
for (int i = 0; i < payload.length; i++) payload[i] = (byte) i;
|
||||
return new Record(PROTOCOL_VERSION, VERSIONS, payload);
|
||||
}
|
||||
|
||||
private byte[] createPayload() throws Exception {
|
||||
ByteArrayOutputStream payload = new ByteArrayOutputStream();
|
||||
while (payload.size() + UniqueId.LENGTH <= MAX_RECORD_PAYLOAD_BYTES) {
|
||||
|
||||
Reference in New Issue
Block a user