Check periodically for retransmittable packets. Bug #46.

This commit is contained in:
akwizgran
2014-12-14 20:26:41 +00:00
parent 29a6596ee3
commit 388b36b6be
51 changed files with 351 additions and 331 deletions

View File

@@ -69,8 +69,7 @@ class DroidtoothPlugin implements DuplexPlugin {
private final SecureRandom secureRandom; private final SecureRandom secureRandom;
private final Clock clock; private final Clock clock;
private final DuplexPluginCallback callback; private final DuplexPluginCallback callback;
private final int maxFrameLength; private final int maxFrameLength, maxLatency, pollingInterval;
private final long maxLatency, pollingInterval;
private volatile boolean running = false; private volatile boolean running = false;
private volatile boolean wasDisabled = false; private volatile boolean wasDisabled = false;
@@ -82,8 +81,8 @@ class DroidtoothPlugin implements DuplexPlugin {
DroidtoothPlugin(Executor ioExecutor, AndroidExecutor androidExecutor, DroidtoothPlugin(Executor ioExecutor, AndroidExecutor androidExecutor,
Context appContext, SecureRandom secureRandom, Clock clock, Context appContext, SecureRandom secureRandom, Clock clock,
DuplexPluginCallback callback, int maxFrameLength, long maxLatency, DuplexPluginCallback callback, int maxFrameLength, int maxLatency,
long pollingInterval) { int pollingInterval) {
this.ioExecutor = ioExecutor; this.ioExecutor = ioExecutor;
this.androidExecutor = androidExecutor; this.androidExecutor = androidExecutor;
this.appContext = appContext; this.appContext = appContext;
@@ -103,13 +102,13 @@ class DroidtoothPlugin implements DuplexPlugin {
return maxFrameLength; return maxFrameLength;
} }
public long getMaxLatency() { public int getMaxLatency() {
return maxLatency; return maxLatency;
} }
public long getMaxIdleTime() { public int getMaxIdleTime() {
// Bluetooth detects dead connections so we don't need keepalives // Bluetooth detects dead connections so we don't need keepalives
return Long.MAX_VALUE; return Integer.MAX_VALUE;
} }
public boolean start() throws IOException { public boolean start() throws IOException {
@@ -245,7 +244,7 @@ class DroidtoothPlugin implements DuplexPlugin {
return true; return true;
} }
public long getPollingInterval() { public int getPollingInterval() {
return pollingInterval; return pollingInterval;
} }

View File

@@ -16,8 +16,8 @@ import android.content.Context;
public class DroidtoothPluginFactory implements DuplexPluginFactory { public class DroidtoothPluginFactory implements DuplexPluginFactory {
private static final int MAX_FRAME_LENGTH = 1024; private static final int MAX_FRAME_LENGTH = 1024;
private static final long MAX_LATENCY = 60 * 1000; // 1 minute private static final int MAX_LATENCY = 30 * 1000; // 30 seconds
private static final long POLLING_INTERVAL = 3 * 60 * 1000; // 3 minutes private static final int POLLING_INTERVAL = 3 * 60 * 1000; // 3 minutes
private final Executor ioExecutor; private final Executor ioExecutor;
private final AndroidExecutor androidExecutor; private final AndroidExecutor androidExecutor;

View File

@@ -64,11 +64,11 @@ class DroidtoothTransportConnection implements DuplexTransportConnection {
return plugin.getMaxFrameLength(); return plugin.getMaxFrameLength();
} }
public long getMaxLatency() { public int getMaxLatency() {
return plugin.getMaxLatency(); return plugin.getMaxLatency();
} }
public long getMaxIdleTime() { public int getMaxIdleTime() {
return plugin.getMaxIdleTime(); return plugin.getMaxIdleTime();
} }

View File

@@ -26,8 +26,8 @@ class AndroidLanTcpPlugin extends LanTcpPlugin {
private volatile BroadcastReceiver networkStateReceiver = null; private volatile BroadcastReceiver networkStateReceiver = null;
AndroidLanTcpPlugin(Executor ioExecutor, Context appContext, AndroidLanTcpPlugin(Executor ioExecutor, Context appContext,
DuplexPluginCallback callback, int maxFrameLength, long maxLatency, DuplexPluginCallback callback, int maxFrameLength, int maxLatency,
long maxIdleTime, long pollingInterval) { int maxIdleTime, int pollingInterval) {
super(ioExecutor, callback, maxFrameLength, maxLatency, maxIdleTime, super(ioExecutor, callback, maxFrameLength, maxLatency, maxIdleTime,
pollingInterval); pollingInterval);
this.appContext = appContext; this.appContext = appContext;

View File

@@ -12,9 +12,9 @@ import android.content.Context;
public class AndroidLanTcpPluginFactory implements DuplexPluginFactory { public class AndroidLanTcpPluginFactory implements DuplexPluginFactory {
private static final int MAX_FRAME_LENGTH = 1024; private static final int MAX_FRAME_LENGTH = 1024;
private static final long MAX_LATENCY = 60 * 1000; // 1 minute private static final int MAX_LATENCY = 30 * 1000; // 30 seconds
private static final long MAX_IDLE_TIME = 30 * 1000; // 30 seconds private static final int MAX_IDLE_TIME = 30 * 1000; // 30 seconds
private static final long POLLING_INTERVAL = 60 * 1000; // 1 minute private static final int POLLING_INTERVAL = 3 * 60 * 1000; // 3 minutes
private final Executor ioExecutor; private final Executor ioExecutor;
private final Context appContext; private final Context appContext;

View File

@@ -75,8 +75,8 @@ class TorPlugin implements DuplexPlugin, EventHandler {
private final Context appContext; private final Context appContext;
private final LocationUtils locationUtils; private final LocationUtils locationUtils;
private final DuplexPluginCallback callback; private final DuplexPluginCallback callback;
private final int maxFrameLength, socketTimeout; private final int maxFrameLength, maxLatency, maxIdleTime, pollingInterval;
private final long maxLatency, maxIdleTime, pollingInterval; private final int socketTimeout;
private final File torDirectory, torFile, geoIpFile, configFile, doneFile; private final File torDirectory, torFile, geoIpFile, configFile, doneFile;
private final File cookieFile, hostnameFile; private final File cookieFile, hostnameFile;
private final AtomicBoolean circuitBuilt; private final AtomicBoolean circuitBuilt;
@@ -90,8 +90,8 @@ class TorPlugin implements DuplexPlugin, EventHandler {
TorPlugin(Executor ioExecutor, Context appContext, TorPlugin(Executor ioExecutor, Context appContext,
LocationUtils locationUtils, DuplexPluginCallback callback, LocationUtils locationUtils, DuplexPluginCallback callback,
int maxFrameLength, long maxLatency, long maxIdleTime, int maxFrameLength, int maxLatency, int maxIdleTime,
long pollingInterval) { int pollingInterval) {
this.ioExecutor = ioExecutor; this.ioExecutor = ioExecutor;
this.appContext = appContext; this.appContext = appContext;
this.locationUtils = locationUtils; this.locationUtils = locationUtils;
@@ -100,9 +100,9 @@ class TorPlugin implements DuplexPlugin, EventHandler {
this.maxLatency = maxLatency; this.maxLatency = maxLatency;
this.maxIdleTime = maxIdleTime; this.maxIdleTime = maxIdleTime;
this.pollingInterval = pollingInterval; this.pollingInterval = pollingInterval;
if(2 * maxIdleTime > Integer.MAX_VALUE) if(maxIdleTime > Integer.MAX_VALUE / 2)
socketTimeout = Integer.MAX_VALUE; socketTimeout = Integer.MAX_VALUE;
else socketTimeout = (int) (2 * maxIdleTime); else socketTimeout = maxIdleTime * 2;
torDirectory = appContext.getDir("tor", MODE_PRIVATE); torDirectory = appContext.getDir("tor", MODE_PRIVATE);
torFile = new File(torDirectory, "tor"); torFile = new File(torDirectory, "tor");
geoIpFile = new File(torDirectory, "geoip"); geoIpFile = new File(torDirectory, "geoip");
@@ -121,11 +121,11 @@ class TorPlugin implements DuplexPlugin, EventHandler {
return maxFrameLength; return maxFrameLength;
} }
public long getMaxLatency() { public int getMaxLatency() {
return maxLatency; return maxLatency;
} }
public long getMaxIdleTime() { public int getMaxIdleTime() {
return maxIdleTime; return maxIdleTime;
} }
@@ -504,7 +504,7 @@ class TorPlugin implements DuplexPlugin, EventHandler {
return true; return true;
} }
public long getPollingInterval() { public int getPollingInterval() {
return pollingInterval; return pollingInterval;
} }

View File

@@ -18,9 +18,9 @@ public class TorPluginFactory implements DuplexPluginFactory {
Logger.getLogger(TorPluginFactory.class.getName()); Logger.getLogger(TorPluginFactory.class.getName());
private static final int MAX_FRAME_LENGTH = 1024; private static final int MAX_FRAME_LENGTH = 1024;
private static final long MAX_LATENCY = 60 * 1000; // 1 minute private static final int MAX_LATENCY = 30 * 1000; // 30 seconds
private static final long MAX_IDLE_TIME = 30 * 1000; // 30 seconds private static final int MAX_IDLE_TIME = 30 * 1000; // 30 seconds
private static final long POLLING_INTERVAL = 3 * 60 * 1000; // 3 minutes private static final int POLLING_INTERVAL = 3 * 60 * 1000; // 3 minutes
private final Executor ioExecutor; private final Executor ioExecutor;
private final Context appContext; private final Context appContext;

View File

@@ -63,11 +63,11 @@ class TorTransportConnection implements DuplexTransportConnection {
return plugin.getMaxFrameLength(); return plugin.getMaxFrameLength();
} }
public long getMaxLatency() { public int getMaxLatency() {
return plugin.getMaxLatency(); return plugin.getMaxLatency();
} }
public long getMaxIdleTime() { public int getMaxIdleTime() {
return plugin.getMaxIdleTime(); return plugin.getMaxIdleTime();
} }

View File

@@ -20,5 +20,5 @@ public interface KeyManager extends Service {
* Called whenever an endpoint has been added. The initial secret is erased * Called whenever an endpoint has been added. The initial secret is erased
* before returning. * before returning.
*/ */
void endpointAdded(Endpoint ep, long maxLatency, byte[] initialSecret); void endpointAdded(Endpoint ep, int maxLatency, byte[] initialSecret);
} }

View File

@@ -73,7 +73,7 @@ public interface DatabaseComponent {
* Stores a transport and returns true if the transport was not previously * Stores a transport and returns true if the transport was not previously
* in the database. * in the database.
*/ */
boolean addTransport(TransportId t, long maxLatency) throws DbException; boolean addTransport(TransportId t, int maxLatency) throws DbException;
/** /**
* Returns an acknowledgement for the given contact, or null if there are * Returns an acknowledgement for the given contact, or null if there are
@@ -88,14 +88,14 @@ public interface DatabaseComponent {
* sendable messages that fit in the given length. * sendable messages that fit in the given length.
*/ */
Collection<byte[]> generateBatch(ContactId c, int maxLength, Collection<byte[]> generateBatch(ContactId c, int maxLength,
long maxLatency) throws DbException; int maxLatency) throws DbException;
/** /**
* Returns an offer for the given contact for transmission over a * Returns an offer for the given contact for transmission over a
* transport with the given maximum latency, or null if there are no * transport with the given maximum latency, or null if there are no
* messages to offer. * messages to offer.
*/ */
Offer generateOffer(ContactId c, int maxMessages, long maxLatency) Offer generateOffer(ContactId c, int maxMessages, int maxLatency)
throws DbException; throws DbException;
/** /**
@@ -112,7 +112,7 @@ public interface DatabaseComponent {
* sendable messages that fit in the given length. * sendable messages that fit in the given length.
*/ */
Collection<byte[]> generateRequestedBatch(ContactId c, int maxLength, Collection<byte[]> generateRequestedBatch(ContactId c, int maxLength,
long maxLatency) throws DbException; int maxLatency) throws DbException;
/** /**
* Returns a retention ack for the given contact, or null if no retention * Returns a retention ack for the given contact, or null if no retention
@@ -125,7 +125,7 @@ public interface DatabaseComponent {
* over a transport with the given latency. Returns null if no update is * over a transport with the given latency. Returns null if no update is
* due. * due.
*/ */
RetentionUpdate generateRetentionUpdate(ContactId c, long maxLatency) RetentionUpdate generateRetentionUpdate(ContactId c, int maxLatency)
throws DbException; throws DbException;
/** /**
@@ -139,7 +139,7 @@ public interface DatabaseComponent {
* over a transport with the given latency. Returns null if no update is * over a transport with the given latency. Returns null if no update is
* due. * due.
*/ */
SubscriptionUpdate generateSubscriptionUpdate(ContactId c, long maxLatency) SubscriptionUpdate generateSubscriptionUpdate(ContactId c, int maxLatency)
throws DbException; throws DbException;
/** /**
@@ -155,7 +155,7 @@ public interface DatabaseComponent {
* updates are due. * updates are due.
*/ */
Collection<TransportUpdate> generateTransportUpdates(ContactId c, Collection<TransportUpdate> generateTransportUpdates(ContactId c,
long maxLatency) throws DbException; int maxLatency) throws DbException;
/** /**
* Returns the status of all groups to which the user subscribes or can * Returns the status of all groups to which the user subscribes or can
@@ -227,8 +227,8 @@ public interface DatabaseComponent {
/** Returns all contacts who subscribe to the given group. */ /** Returns all contacts who subscribe to the given group. */
Collection<Contact> getSubscribers(GroupId g) throws DbException; Collection<Contact> getSubscribers(GroupId g) throws DbException;
/** Returns the maximum latencies of all local transports. */ /** Returns the maximum latencies of all supported transports. */
Map<TransportId, Long> getTransportLatencies() throws DbException; Map<TransportId, Integer> getTransportLatencies() throws DbException;
/** Returns the number of unread messages in each subscribed group. */ /** Returns the number of unread messages in each subscribed group. */
Map<GroupId, Integer> getUnreadMessageCounts() throws DbException; Map<GroupId, Integer> getUnreadMessageCounts() throws DbException;

View File

@@ -6,9 +6,9 @@ import org.briarproject.api.TransportId;
public class TransportAddedEvent extends Event { public class TransportAddedEvent extends Event {
private final TransportId transportId; private final TransportId transportId;
private final long maxLatency; private final int maxLatency;
public TransportAddedEvent(TransportId transportId, long maxLatency) { public TransportAddedEvent(TransportId transportId, int maxLatency) {
this.transportId = transportId; this.transportId = transportId;
this.maxLatency = maxLatency; this.maxLatency = maxLatency;
} }
@@ -17,7 +17,7 @@ public class TransportAddedEvent extends Event {
return transportId; return transportId;
} }
public long getMaxLatency() { public int getMaxLatency() {
return maxLatency; return maxLatency;
} }
} }

View File

@@ -12,8 +12,8 @@ public interface MessagingSessionFactory {
InputStream in); InputStream in);
MessagingSession createSimplexOutgoingSession(ContactId c, TransportId t, MessagingSession createSimplexOutgoingSession(ContactId c, TransportId t,
long maxLatency, OutputStream out); int maxLatency, OutputStream out);
MessagingSession createDuplexOutgoingSession(ContactId c, TransportId t, MessagingSession createDuplexOutgoingSession(ContactId c, TransportId t,
long maxLatency, long maxIdleTime, OutputStream out); int maxLatency, int maxIdleTime, OutputStream out);
} }

View File

@@ -29,4 +29,6 @@ public interface PacketWriter {
void writeTransportAck(TransportAck a) throws IOException; void writeTransportAck(TransportAck a) throws IOException;
void writeTransportUpdate(TransportUpdate u) throws IOException; void writeTransportUpdate(TransportUpdate u) throws IOException;
void flush() throws IOException;
} }

View File

@@ -15,10 +15,10 @@ public interface Plugin {
int getMaxFrameLength(); int getMaxFrameLength();
/** Returns the transport's maximum latency in milliseconds. */ /** Returns the transport's maximum latency in milliseconds. */
long getMaxLatency(); int getMaxLatency();
/** Returns the transport's maximum idle time in milliseconds. */ /** Returns the transport's maximum idle time in milliseconds. */
long getMaxIdleTime(); int getMaxIdleTime();
/** Starts the plugin and returns true if it started successfully. */ /** Starts the plugin and returns true if it started successfully. */
boolean start() throws IOException; boolean start() throws IOException;
@@ -39,7 +39,7 @@ public interface Plugin {
* Returns the desired interval in milliseconds between calls to the * Returns the desired interval in milliseconds between calls to the
* plugin's {@link #poll(Collection)} method. * plugin's {@link #poll(Collection)} method.
*/ */
long getPollingInterval(); int getPollingInterval();
/** /**
* Attempts to establish connections to contacts, passing any created * Attempts to establish connections to contacts, passing any created

View File

@@ -13,10 +13,10 @@ public interface TransportConnectionWriter {
int getMaxFrameLength(); int getMaxFrameLength();
/** Returns the maximum latency of the transport in milliseconds. */ /** Returns the maximum latency of the transport in milliseconds. */
long getMaxLatency(); int getMaxLatency();
/** Returns the maximum idle time of the transport in milliseconds. */ /** Returns the maximum idle time of the transport in milliseconds. */
long getMaxIdleTime(); int getMaxIdleTime();
/** Returns the capacity of the transport connection in bytes. */ /** Returns the capacity of the transport connection in bytes. */
long getCapacity(); long getCapacity();

View File

@@ -152,7 +152,7 @@ interface Database<T> {
* <p> * <p>
* Locking: write. * Locking: write.
*/ */
boolean addTransport(T txn, TransportId t, long maxLatency) boolean addTransport(T txn, TransportId t, int maxLatency)
throws DbException; throws DbException;
/** /**
@@ -460,7 +460,7 @@ interface Database<T> {
* <p> * <p>
* Locking: write. * Locking: write.
*/ */
RetentionUpdate getRetentionUpdate(T txn, ContactId c, long maxLatency) RetentionUpdate getRetentionUpdate(T txn, ContactId c, int maxLatency)
throws DbException; throws DbException;
/** /**
@@ -499,7 +499,7 @@ interface Database<T> {
* Locking: write. * Locking: write.
*/ */
SubscriptionUpdate getSubscriptionUpdate(T txn, ContactId c, SubscriptionUpdate getSubscriptionUpdate(T txn, ContactId c,
long maxLatency) throws DbException; int maxLatency) throws DbException;
/** /**
* Returns a collection of transport acks for the given contact, or null if * Returns a collection of transport acks for the given contact, or null if
@@ -511,11 +511,11 @@ interface Database<T> {
throws DbException; throws DbException;
/** /**
* Returns the maximum latencies of all local transports. * Returns the maximum latencies of all supported transports.
* <p> * <p>
* Locking: read. * Locking: read.
*/ */
Map<TransportId, Long> getTransportLatencies(T txn) throws DbException; Map<TransportId, Integer> getTransportLatencies(T txn) throws DbException;
/** /**
* Returns a collection of transport updates for the given contact and * Returns a collection of transport updates for the given contact and
@@ -525,7 +525,7 @@ interface Database<T> {
* Locking: write. * Locking: write.
*/ */
Collection<TransportUpdate> getTransportUpdates(T txn, ContactId c, Collection<TransportUpdate> getTransportUpdates(T txn, ContactId c,
long maxLatency) throws DbException; int maxLatency) throws DbException;
/** /**
* Returns the number of unread messages in each subscribed group. * Returns the number of unread messages in each subscribed group.
@@ -798,6 +798,6 @@ interface Database<T> {
* <p> * <p>
* Locking: write. * Locking: write.
*/ */
void updateExpiryTime(T txn, ContactId c, MessageId m, long maxLatency) void updateExpiryTime(T txn, ContactId c, MessageId m, int maxLatency)
throws DbException; throws DbException;
} }

View File

@@ -314,7 +314,7 @@ DatabaseCleaner.Callback {
} }
} }
public boolean addTransport(TransportId t, long maxLatency) public boolean addTransport(TransportId t, int maxLatency)
throws DbException { throws DbException {
boolean added; boolean added;
lock.writeLock().lock(); lock.writeLock().lock();
@@ -357,7 +357,7 @@ DatabaseCleaner.Callback {
} }
public Collection<byte[]> generateBatch(ContactId c, int maxLength, public Collection<byte[]> generateBatch(ContactId c, int maxLength,
long maxLatency) throws DbException { int maxLatency) throws DbException {
Collection<MessageId> ids; Collection<MessageId> ids;
List<byte[]> messages = new ArrayList<byte[]>(); List<byte[]> messages = new ArrayList<byte[]>();
lock.writeLock().lock(); lock.writeLock().lock();
@@ -384,7 +384,7 @@ DatabaseCleaner.Callback {
return Collections.unmodifiableList(messages); return Collections.unmodifiableList(messages);
} }
public Offer generateOffer(ContactId c, int maxMessages, long maxLatency) public Offer generateOffer(ContactId c, int maxMessages, int maxLatency)
throws DbException { throws DbException {
Collection<MessageId> ids; Collection<MessageId> ids;
lock.writeLock().lock(); lock.writeLock().lock();
@@ -432,7 +432,7 @@ DatabaseCleaner.Callback {
} }
public Collection<byte[]> generateRequestedBatch(ContactId c, int maxLength, public Collection<byte[]> generateRequestedBatch(ContactId c, int maxLength,
long maxLatency) throws DbException { int maxLatency) throws DbException {
Collection<MessageId> ids; Collection<MessageId> ids;
List<byte[]> messages = new ArrayList<byte[]>(); List<byte[]> messages = new ArrayList<byte[]>();
lock.writeLock().lock(); lock.writeLock().lock();
@@ -478,7 +478,7 @@ DatabaseCleaner.Callback {
} }
} }
public RetentionUpdate generateRetentionUpdate(ContactId c, long maxLatency) public RetentionUpdate generateRetentionUpdate(ContactId c, int maxLatency)
throws DbException { throws DbException {
lock.writeLock().lock(); lock.writeLock().lock();
try { try {
@@ -519,7 +519,7 @@ DatabaseCleaner.Callback {
} }
public SubscriptionUpdate generateSubscriptionUpdate(ContactId c, public SubscriptionUpdate generateSubscriptionUpdate(ContactId c,
long maxLatency) throws DbException { int maxLatency) throws DbException {
lock.writeLock().lock(); lock.writeLock().lock();
try { try {
T txn = db.startTransaction(); T txn = db.startTransaction();
@@ -560,7 +560,7 @@ DatabaseCleaner.Callback {
} }
public Collection<TransportUpdate> generateTransportUpdates(ContactId c, public Collection<TransportUpdate> generateTransportUpdates(ContactId c,
long maxLatency) throws DbException { int maxLatency) throws DbException {
lock.writeLock().lock(); lock.writeLock().lock();
try { try {
T txn = db.startTransaction(); T txn = db.startTransaction();
@@ -932,12 +932,13 @@ DatabaseCleaner.Callback {
} }
} }
public Map<TransportId, Long> getTransportLatencies() throws DbException { public Map<TransportId, Integer> getTransportLatencies()
throws DbException {
lock.readLock().lock(); lock.readLock().lock();
try { try {
T txn = db.startTransaction(); T txn = db.startTransaction();
try { try {
Map<TransportId, Long> latencies = Map<TransportId, Integer> latencies =
db.getTransportLatencies(txn); db.getTransportLatencies(txn);
db.commitTransaction(txn); db.commitTransaction(txn);
return latencies; return latencies;

View File

@@ -11,13 +11,12 @@ class ExponentialBackoff {
* transmissions increases exponentially. If the expiry time would * transmissions increases exponentially. If the expiry time would
* be greater than Long.MAX_VALUE, Long.MAX_VALUE is returned. * be greater than Long.MAX_VALUE, Long.MAX_VALUE is returned.
*/ */
static long calculateExpiry(long now, long maxLatency, int txCount) { static long calculateExpiry(long now, int maxLatency, int txCount) {
if(now < 0) throw new IllegalArgumentException(); if(now < 0) throw new IllegalArgumentException();
if(maxLatency <= 0) throw new IllegalArgumentException(); if(maxLatency <= 0) throw new IllegalArgumentException();
if(txCount < 0) throw new IllegalArgumentException(); if(txCount < 0) throw new IllegalArgumentException();
// The maximum round-trip time is twice the maximum latency // The maximum round-trip time is twice the maximum latency
long roundTrip = maxLatency * 2; long roundTrip = maxLatency * 2L;
if(roundTrip < 0) return Long.MAX_VALUE;
// The interval between transmissions is roundTrip * 2 ^ txCount // The interval between transmissions is roundTrip * 2 ^ txCount
for(int i = 0; i < txCount; i++) { for(int i = 0; i < txCount; i++) {
roundTrip <<= 1; roundTrip <<= 1;

View File

@@ -62,8 +62,8 @@ import org.briarproject.api.transport.TemporarySecret;
*/ */
abstract class JdbcDatabase implements Database<Connection> { abstract class JdbcDatabase implements Database<Connection> {
private static final int SCHEMA_VERSION = 6; private static final int SCHEMA_VERSION = 7;
private static final int MIN_SCHEMA_VERSION = 5; private static final int MIN_SCHEMA_VERSION = 7;
private static final String CREATE_SETTINGS = private static final String CREATE_SETTINGS =
"CREATE TABLE settings" "CREATE TABLE settings"
@@ -213,7 +213,7 @@ abstract class JdbcDatabase implements Database<Connection> {
private static final String CREATE_TRANSPORTS = private static final String CREATE_TRANSPORTS =
"CREATE TABLE transports" "CREATE TABLE transports"
+ " (transportId VARCHAR NOT NULL," + " (transportId VARCHAR NOT NULL,"
+ " maxLatency BIGINT NOT NULL," + " maxLatency INT NOT NULL,"
+ " PRIMARY KEY (transportId))"; + " PRIMARY KEY (transportId))";
private static final String CREATE_TRANSPORT_CONFIGS = private static final String CREATE_TRANSPORT_CONFIGS =
@@ -866,7 +866,7 @@ abstract class JdbcDatabase implements Database<Connection> {
} }
} }
public boolean addTransport(Connection txn, TransportId t, long maxLatency) public boolean addTransport(Connection txn, TransportId t, int maxLatency)
throws DbException { throws DbException {
PreparedStatement ps = null; PreparedStatement ps = null;
ResultSet rs = null; ResultSet rs = null;
@@ -2024,7 +2024,7 @@ abstract class JdbcDatabase implements Database<Connection> {
} }
public RetentionUpdate getRetentionUpdate(Connection txn, ContactId c, public RetentionUpdate getRetentionUpdate(Connection txn, ContactId c,
long maxLatency) throws DbException { int maxLatency) throws DbException {
long now = clock.currentTimeMillis(); long now = clock.currentTimeMillis();
PreparedStatement ps = null; PreparedStatement ps = null;
ResultSet rs = null; ResultSet rs = null;
@@ -2202,7 +2202,7 @@ abstract class JdbcDatabase implements Database<Connection> {
} }
public SubscriptionUpdate getSubscriptionUpdate(Connection txn, ContactId c, public SubscriptionUpdate getSubscriptionUpdate(Connection txn, ContactId c,
long maxLatency) throws DbException { int maxLatency) throws DbException {
long now = clock.currentTimeMillis(); long now = clock.currentTimeMillis();
PreparedStatement ps = null; PreparedStatement ps = null;
ResultSet rs = null; ResultSet rs = null;
@@ -2296,7 +2296,7 @@ abstract class JdbcDatabase implements Database<Connection> {
} }
} }
public Map<TransportId, Long> getTransportLatencies(Connection txn) public Map<TransportId, Integer> getTransportLatencies(Connection txn)
throws DbException { throws DbException {
PreparedStatement ps = null; PreparedStatement ps = null;
ResultSet rs = null; ResultSet rs = null;
@@ -2304,10 +2304,11 @@ abstract class JdbcDatabase implements Database<Connection> {
String sql = "SELECT transportId, maxLatency FROM transports"; String sql = "SELECT transportId, maxLatency FROM transports";
ps = txn.prepareStatement(sql); ps = txn.prepareStatement(sql);
rs = ps.executeQuery(); rs = ps.executeQuery();
Map<TransportId, Long> latencies = new HashMap<TransportId, Long>(); Map<TransportId, Integer> latencies =
new HashMap<TransportId, Integer>();
while(rs.next()){ while(rs.next()){
TransportId id = new TransportId(rs.getString(1)); TransportId id = new TransportId(rs.getString(1));
latencies.put(id, rs.getLong(2)); latencies.put(id, rs.getInt(2));
} }
rs.close(); rs.close();
ps.close(); ps.close();
@@ -2320,7 +2321,7 @@ abstract class JdbcDatabase implements Database<Connection> {
} }
public Collection<TransportUpdate> getTransportUpdates(Connection txn, public Collection<TransportUpdate> getTransportUpdates(Connection txn,
ContactId c, long maxLatency) throws DbException { ContactId c, int maxLatency) throws DbException {
long now = clock.currentTimeMillis(); long now = clock.currentTimeMillis();
PreparedStatement ps = null; PreparedStatement ps = null;
ResultSet rs = null; ResultSet rs = null;
@@ -3301,7 +3302,7 @@ abstract class JdbcDatabase implements Database<Connection> {
} }
public void updateExpiryTime(Connection txn, ContactId c, MessageId m, public void updateExpiryTime(Connection txn, ContactId c, MessageId m,
long maxLatency) throws DbException { int maxLatency) throws DbException {
PreparedStatement ps = null; PreparedStatement ps = null;
ResultSet rs = null; ResultSet rs = null;
try { try {

View File

@@ -285,7 +285,7 @@ abstract class Connector extends Thread {
db.setRemoteProperties(contactId, remoteProps); db.setRemoteProperties(contactId, remoteProps);
// Create an endpoint for each transport shared with the contact // Create an endpoint for each transport shared with the contact
List<TransportId> ids = new ArrayList<TransportId>(); List<TransportId> ids = new ArrayList<TransportId>();
Map<TransportId, Long> latencies = db.getTransportLatencies(); Map<TransportId, Integer> latencies = db.getTransportLatencies();
for(TransportId id : localProps.keySet()) { for(TransportId id : localProps.keySet()) {
if(latencies.containsKey(id) && remoteProps.containsKey(id)) if(latencies.containsKey(id) && remoteProps.containsKey(id))
ids.add(id); ids.add(id);
@@ -296,7 +296,7 @@ abstract class Connector extends Thread {
for(int i = 0; i < size; i++) { for(int i = 0; i < size; i++) {
TransportId id = ids.get(i); TransportId id = ids.get(i);
Endpoint ep = new Endpoint(contactId, id, epoch, alice); Endpoint ep = new Endpoint(contactId, id, epoch, alice);
long maxLatency = latencies.get(id); int maxLatency = latencies.get(id);
try { try {
db.addEndpoint(ep); db.addEndpoint(ep);
} catch(NoSuchTransportException e) { } catch(NoSuchTransportException e) {

View File

@@ -6,7 +6,6 @@ import static java.util.logging.Level.WARNING;
import static org.briarproject.api.messaging.MessagingConstants.MAX_PACKET_LENGTH; import static org.briarproject.api.messaging.MessagingConstants.MAX_PACKET_LENGTH;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@@ -37,7 +36,6 @@ import org.briarproject.api.messaging.Ack;
import org.briarproject.api.messaging.MessagingSession; import org.briarproject.api.messaging.MessagingSession;
import org.briarproject.api.messaging.Offer; import org.briarproject.api.messaging.Offer;
import org.briarproject.api.messaging.PacketWriter; import org.briarproject.api.messaging.PacketWriter;
import org.briarproject.api.messaging.PacketWriterFactory;
import org.briarproject.api.messaging.Request; import org.briarproject.api.messaging.Request;
import org.briarproject.api.messaging.RetentionAck; import org.briarproject.api.messaging.RetentionAck;
import org.briarproject.api.messaging.RetentionUpdate; import org.briarproject.api.messaging.RetentionUpdate;
@@ -45,16 +43,18 @@ import org.briarproject.api.messaging.SubscriptionAck;
import org.briarproject.api.messaging.SubscriptionUpdate; import org.briarproject.api.messaging.SubscriptionUpdate;
import org.briarproject.api.messaging.TransportAck; import org.briarproject.api.messaging.TransportAck;
import org.briarproject.api.messaging.TransportUpdate; import org.briarproject.api.messaging.TransportUpdate;
import org.briarproject.api.system.Clock;
/** /**
* An outgoing {@link org.briarproject.api.messaging.MessagingSession * An outgoing {@link org.briarproject.api.messaging.MessagingSession
* MessagingSession} suitable for duplex transports. The session offers * MessagingSession} suitable for duplex transports. The session offers
* messages before sending them, keeps its output stream open when there are no * messages before sending them, keeps its output stream open when there are no
* more packets to send, and reacts to events that make packets available to * packets to send, and reacts to events that make packets available to send.
* send.
*/ */
class DuplexOutgoingSession implements MessagingSession, EventListener { class DuplexOutgoingSession implements MessagingSession, EventListener {
// Check for retransmittable packets once every 60 seconds
private static final int RETX_QUERY_INTERVAL = 60 * 1000;
private static final Logger LOG = private static final Logger LOG =
Logger.getLogger(DuplexOutgoingSession.class.getName()); Logger.getLogger(DuplexOutgoingSession.class.getName());
@@ -66,28 +66,32 @@ class DuplexOutgoingSession implements MessagingSession, EventListener {
private final DatabaseComponent db; private final DatabaseComponent db;
private final Executor dbExecutor; private final Executor dbExecutor;
private final EventBus eventBus; private final EventBus eventBus;
private final Clock clock;
private final ContactId contactId; private final ContactId contactId;
private final TransportId transportId; private final TransportId transportId;
private final long maxLatency, maxIdleTime; private final int maxLatency, maxIdleTime;
private final OutputStream out;
private final PacketWriter packetWriter; private final PacketWriter packetWriter;
private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks; private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks;
// The following must only be accessed on the writer thread
private long nextKeepalive = 0, nextRetxQuery = 0;
private boolean dataToFlush = true;
private volatile boolean interrupted = false; private volatile boolean interrupted = false;
DuplexOutgoingSession(DatabaseComponent db, Executor dbExecutor, DuplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
EventBus eventBus, PacketWriterFactory packetWriterFactory, EventBus eventBus, Clock clock, ContactId contactId,
ContactId contactId, TransportId transportId, long maxLatency, TransportId transportId, int maxLatency, int maxIdleTime,
long maxIdleTime, OutputStream out) { PacketWriter packetWriter) {
this.db = db; this.db = db;
this.dbExecutor = dbExecutor; this.dbExecutor = dbExecutor;
this.eventBus = eventBus; this.eventBus = eventBus;
this.clock = clock;
this.contactId = contactId; this.contactId = contactId;
this.transportId = transportId; this.transportId = transportId;
this.maxLatency = maxLatency; this.maxLatency = maxLatency;
this.maxIdleTime = maxIdleTime; this.maxIdleTime = maxIdleTime;
this.out = out; this.packetWriter = packetWriter;
packetWriter = packetWriterFactory.createPacketWriter(out);
writerTasks = new LinkedBlockingQueue<ThrowingRunnable<IOException>>(); writerTasks = new LinkedBlockingQueue<ThrowingRunnable<IOException>>();
} }
@@ -105,21 +109,50 @@ class DuplexOutgoingSession implements MessagingSession, EventListener {
dbExecutor.execute(new GenerateBatch()); dbExecutor.execute(new GenerateBatch());
dbExecutor.execute(new GenerateOffer()); dbExecutor.execute(new GenerateOffer());
dbExecutor.execute(new GenerateRequest()); dbExecutor.execute(new GenerateRequest());
long now = clock.currentTimeMillis();
nextKeepalive = now + maxIdleTime;
nextRetxQuery = now + RETX_QUERY_INTERVAL;
// Write packets until interrupted // Write packets until interrupted
try { try {
while(!interrupted) { while(!interrupted) {
// Flush the stream if it's going to be idle // Work out how long we should wait for a packet
if(writerTasks.isEmpty()) out.flush(); now = clock.currentTimeMillis();
ThrowingRunnable<IOException> task = long wait = Math.min(nextKeepalive, nextRetxQuery) - now;
writerTasks.poll(maxIdleTime, MILLISECONDS); if(wait < 0) wait = 0;
if(task == null) { // Flush any unflushed data if we're going to wait
LOG.info("Idle timeout"); if(wait > 0 && dataToFlush && writerTasks.isEmpty()) {
continue; // Flush and wait again packetWriter.flush();
dataToFlush = false;
nextKeepalive = now + maxIdleTime;
}
// Wait for a packet
ThrowingRunnable<IOException> task = writerTasks.poll(wait,
MILLISECONDS);
if(task == null) {
now = clock.currentTimeMillis();
if(now >= nextRetxQuery) {
// Check for retransmittable packets
dbExecutor.execute(new GenerateTransportUpdates());
dbExecutor.execute(new GenerateSubscriptionUpdate());
dbExecutor.execute(new GenerateRetentionUpdate());
dbExecutor.execute(new GenerateBatch());
dbExecutor.execute(new GenerateOffer());
nextRetxQuery = now + RETX_QUERY_INTERVAL;
}
if(now >= nextKeepalive) {
// Flush the stream to keep it alive
packetWriter.flush();
dataToFlush = false;
nextKeepalive = now + maxIdleTime;
}
} else if(task == CLOSE) {
break;
} else {
task.run();
dataToFlush = true;
} }
if(task == CLOSE) break;
task.run();
} }
out.flush(); if(dataToFlush) packetWriter.flush();
} catch(InterruptedException e) { } catch(InterruptedException e) {
LOG.info("Interrupted while waiting for a packet to write"); LOG.info("Interrupted while waiting for a packet to write");
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();

View File

@@ -3,7 +3,6 @@ package org.briarproject.messaging;
import static java.util.logging.Level.WARNING; import static java.util.logging.Level.WARNING;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.security.GeneralSecurityException; import java.security.GeneralSecurityException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.logging.Logger; import java.util.logging.Logger;
@@ -25,7 +24,6 @@ import org.briarproject.api.messaging.MessageVerifier;
import org.briarproject.api.messaging.MessagingSession; import org.briarproject.api.messaging.MessagingSession;
import org.briarproject.api.messaging.Offer; import org.briarproject.api.messaging.Offer;
import org.briarproject.api.messaging.PacketReader; import org.briarproject.api.messaging.PacketReader;
import org.briarproject.api.messaging.PacketReaderFactory;
import org.briarproject.api.messaging.Request; import org.briarproject.api.messaging.Request;
import org.briarproject.api.messaging.RetentionAck; import org.briarproject.api.messaging.RetentionAck;
import org.briarproject.api.messaging.RetentionUpdate; import org.briarproject.api.messaging.RetentionUpdate;
@@ -56,9 +54,8 @@ class IncomingSession implements MessagingSession, EventListener {
IncomingSession(DatabaseComponent db, Executor dbExecutor, IncomingSession(DatabaseComponent db, Executor dbExecutor,
Executor cryptoExecutor, EventBus eventBus, Executor cryptoExecutor, EventBus eventBus,
MessageVerifier messageVerifier, MessageVerifier messageVerifier, ContactId contactId,
PacketReaderFactory packetReaderFactory, ContactId contactId, TransportId transportId, PacketReader packetReader) {
TransportId transportId, InputStream in) {
this.db = db; this.db = db;
this.dbExecutor = dbExecutor; this.dbExecutor = dbExecutor;
this.cryptoExecutor = cryptoExecutor; this.cryptoExecutor = cryptoExecutor;
@@ -66,7 +63,7 @@ class IncomingSession implements MessagingSession, EventListener {
this.messageVerifier = messageVerifier; this.messageVerifier = messageVerifier;
this.contactId = contactId; this.contactId = contactId;
this.transportId = transportId; this.transportId = transportId;
packetReader = packetReaderFactory.createPacketReader(in); this.packetReader = packetReader;
} }
public void run() throws IOException { public void run() throws IOException {

View File

@@ -15,8 +15,11 @@ import org.briarproject.api.event.EventBus;
import org.briarproject.api.messaging.MessageVerifier; import org.briarproject.api.messaging.MessageVerifier;
import org.briarproject.api.messaging.MessagingSession; import org.briarproject.api.messaging.MessagingSession;
import org.briarproject.api.messaging.MessagingSessionFactory; import org.briarproject.api.messaging.MessagingSessionFactory;
import org.briarproject.api.messaging.PacketReader;
import org.briarproject.api.messaging.PacketReaderFactory; import org.briarproject.api.messaging.PacketReaderFactory;
import org.briarproject.api.messaging.PacketWriter;
import org.briarproject.api.messaging.PacketWriterFactory; import org.briarproject.api.messaging.PacketWriterFactory;
import org.briarproject.api.system.Clock;
class MessagingSessionFactoryImpl implements MessagingSessionFactory { class MessagingSessionFactoryImpl implements MessagingSessionFactory {
@@ -24,6 +27,7 @@ class MessagingSessionFactoryImpl implements MessagingSessionFactory {
private final Executor dbExecutor, cryptoExecutor; private final Executor dbExecutor, cryptoExecutor;
private final MessageVerifier messageVerifier; private final MessageVerifier messageVerifier;
private final EventBus eventBus; private final EventBus eventBus;
private final Clock clock;
private final PacketReaderFactory packetReaderFactory; private final PacketReaderFactory packetReaderFactory;
private final PacketWriterFactory packetWriterFactory; private final PacketWriterFactory packetWriterFactory;
@@ -31,7 +35,7 @@ class MessagingSessionFactoryImpl implements MessagingSessionFactory {
MessagingSessionFactoryImpl(DatabaseComponent db, MessagingSessionFactoryImpl(DatabaseComponent db,
@DatabaseExecutor Executor dbExecutor, @DatabaseExecutor Executor dbExecutor,
@CryptoExecutor Executor cryptoExecutor, @CryptoExecutor Executor cryptoExecutor,
MessageVerifier messageVerifier, EventBus eventBus, MessageVerifier messageVerifier, EventBus eventBus, Clock clock,
PacketReaderFactory packetReaderFactory, PacketReaderFactory packetReaderFactory,
PacketWriterFactory packetWriterFactory) { PacketWriterFactory packetWriterFactory) {
this.db = db; this.db = db;
@@ -39,26 +43,29 @@ class MessagingSessionFactoryImpl implements MessagingSessionFactory {
this.cryptoExecutor = cryptoExecutor; this.cryptoExecutor = cryptoExecutor;
this.messageVerifier = messageVerifier; this.messageVerifier = messageVerifier;
this.eventBus = eventBus; this.eventBus = eventBus;
this.clock = clock;
this.packetReaderFactory = packetReaderFactory; this.packetReaderFactory = packetReaderFactory;
this.packetWriterFactory = packetWriterFactory; this.packetWriterFactory = packetWriterFactory;
} }
public MessagingSession createIncomingSession(ContactId c, TransportId t, public MessagingSession createIncomingSession(ContactId c, TransportId t,
InputStream in) { InputStream in) {
PacketReader packetReader = packetReaderFactory.createPacketReader(in);
return new IncomingSession(db, dbExecutor, cryptoExecutor, eventBus, return new IncomingSession(db, dbExecutor, cryptoExecutor, eventBus,
messageVerifier, packetReaderFactory, c, t, in); messageVerifier, c, t, packetReader);
} }
public MessagingSession createSimplexOutgoingSession(ContactId c, public MessagingSession createSimplexOutgoingSession(ContactId c,
TransportId t, long maxLatency, OutputStream out) { TransportId t, int maxLatency, OutputStream out) {
return new SimplexOutgoingSession(db, dbExecutor, eventBus, PacketWriter packetWriter = packetWriterFactory.createPacketWriter(out);
packetWriterFactory, c, t, maxLatency, out); return new SimplexOutgoingSession(db, dbExecutor, eventBus, c, t,
maxLatency, packetWriter);
} }
public MessagingSession createDuplexOutgoingSession(ContactId c, public MessagingSession createDuplexOutgoingSession(ContactId c,
TransportId t, long maxLatency, long maxIdleTime, TransportId t, int maxLatency, int maxIdleTime, OutputStream out) {
OutputStream out) { PacketWriter packetWriter = packetWriterFactory.createPacketWriter(out);
return new DuplexOutgoingSession(db, dbExecutor, eventBus, return new DuplexOutgoingSession(db, dbExecutor, eventBus, clock, c, t,
packetWriterFactory, c, t, maxLatency, maxIdleTime, out); maxLatency, maxIdleTime, packetWriter);
} }
} }

View File

@@ -143,4 +143,8 @@ class PacketWriterImpl implements PacketWriter {
w.writeInteger(u.getVersion()); w.writeInteger(u.getVersion());
w.writeStructEnd(); w.writeStructEnd();
} }
public void flush() throws IOException {
out.flush();
}
} }

View File

@@ -5,7 +5,6 @@ import static java.util.logging.Level.WARNING;
import static org.briarproject.api.messaging.MessagingConstants.MAX_PACKET_LENGTH; import static org.briarproject.api.messaging.MessagingConstants.MAX_PACKET_LENGTH;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@@ -26,7 +25,6 @@ import org.briarproject.api.event.TransportRemovedEvent;
import org.briarproject.api.messaging.Ack; import org.briarproject.api.messaging.Ack;
import org.briarproject.api.messaging.MessagingSession; import org.briarproject.api.messaging.MessagingSession;
import org.briarproject.api.messaging.PacketWriter; import org.briarproject.api.messaging.PacketWriter;
import org.briarproject.api.messaging.PacketWriterFactory;
import org.briarproject.api.messaging.RetentionAck; import org.briarproject.api.messaging.RetentionAck;
import org.briarproject.api.messaging.RetentionUpdate; import org.briarproject.api.messaging.RetentionUpdate;
import org.briarproject.api.messaging.SubscriptionAck; import org.briarproject.api.messaging.SubscriptionAck;
@@ -55,8 +53,7 @@ class SimplexOutgoingSession implements MessagingSession, EventListener {
private final EventBus eventBus; private final EventBus eventBus;
private final ContactId contactId; private final ContactId contactId;
private final TransportId transportId; private final TransportId transportId;
private final long maxLatency; private final int maxLatency;
private final OutputStream out;
private final PacketWriter packetWriter; private final PacketWriter packetWriter;
private final AtomicInteger outstandingQueries; private final AtomicInteger outstandingQueries;
private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks; private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks;
@@ -64,17 +61,15 @@ class SimplexOutgoingSession implements MessagingSession, EventListener {
private volatile boolean interrupted = false; private volatile boolean interrupted = false;
SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor, SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
EventBus eventBus, PacketWriterFactory packetWriterFactory, EventBus eventBus, ContactId contactId, TransportId transportId,
ContactId contactId, TransportId transportId, long maxLatency, int maxLatency, PacketWriter packetWriter) {
OutputStream out) {
this.db = db; this.db = db;
this.dbExecutor = dbExecutor; this.dbExecutor = dbExecutor;
this.eventBus = eventBus; this.eventBus = eventBus;
this.contactId = contactId; this.contactId = contactId;
this.transportId = transportId; this.transportId = transportId;
this.maxLatency = maxLatency; this.maxLatency = maxLatency;
this.out = out; this.packetWriter = packetWriter;
packetWriter = packetWriterFactory.createPacketWriter(out);
outstandingQueries = new AtomicInteger(8); // One per type of packet outstandingQueries = new AtomicInteger(8); // One per type of packet
writerTasks = new LinkedBlockingQueue<ThrowingRunnable<IOException>>(); writerTasks = new LinkedBlockingQueue<ThrowingRunnable<IOException>>();
} }
@@ -98,7 +93,7 @@ class SimplexOutgoingSession implements MessagingSession, EventListener {
if(task == CLOSE) break; if(task == CLOSE) break;
task.run(); task.run();
} }
out.flush(); packetWriter.flush();
} catch(InterruptedException e) { } catch(InterruptedException e) {
LOG.info("Interrupted while waiting for a packet to write"); LOG.info("Interrupted while waiting for a packet to write");
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();

View File

@@ -28,8 +28,7 @@ public abstract class FilePlugin implements SimplexPlugin {
protected final Executor ioExecutor; protected final Executor ioExecutor;
protected final FileUtils fileUtils; protected final FileUtils fileUtils;
protected final SimplexPluginCallback callback; protected final SimplexPluginCallback callback;
protected final int maxFrameLength; protected final int maxFrameLength, maxLatency;
protected final long maxLatency;
protected volatile boolean running = false; protected volatile boolean running = false;
@@ -40,7 +39,7 @@ public abstract class FilePlugin implements SimplexPlugin {
protected FilePlugin(Executor ioExecutor, FileUtils fileUtils, protected FilePlugin(Executor ioExecutor, FileUtils fileUtils,
SimplexPluginCallback callback, int maxFrameLength, SimplexPluginCallback callback, int maxFrameLength,
long maxLatency) { int maxLatency) {
this.ioExecutor = ioExecutor; this.ioExecutor = ioExecutor;
this.fileUtils = fileUtils; this.fileUtils = fileUtils;
this.callback = callback; this.callback = callback;
@@ -52,12 +51,12 @@ public abstract class FilePlugin implements SimplexPlugin {
return maxFrameLength; return maxFrameLength;
} }
public long getMaxLatency() { public int getMaxLatency() {
return maxLatency; return maxLatency;
} }
public long getMaxIdleTime() { public int getMaxIdleTime() {
return Long.MAX_VALUE; // We don't need keepalives return Integer.MAX_VALUE; // We don't need keepalives
} }
public boolean isRunning() { public boolean isRunning() {

View File

@@ -31,11 +31,11 @@ class FileTransportWriter implements TransportConnectionWriter {
return plugin.getMaxFrameLength(); return plugin.getMaxFrameLength();
} }
public long getMaxLatency() { public int getMaxLatency() {
return plugin.getMaxLatency(); return plugin.getMaxLatency();
} }
public long getMaxIdleTime() { public int getMaxIdleTime() {
return plugin.getMaxIdleTime(); return plugin.getMaxIdleTime();
} }

View File

@@ -17,8 +17,8 @@ class LanTcpPlugin extends TcpPlugin {
static final TransportId ID = new TransportId("lan"); static final TransportId ID = new TransportId("lan");
LanTcpPlugin(Executor ioExecutor, DuplexPluginCallback callback, LanTcpPlugin(Executor ioExecutor, DuplexPluginCallback callback,
int maxFrameLength, long maxLatency, long maxIdleTime, int maxFrameLength, int maxLatency, int maxIdleTime,
long pollingInterval) { int pollingInterval) {
super(ioExecutor, callback, maxFrameLength, maxLatency, maxIdleTime, super(ioExecutor, callback, maxFrameLength, maxLatency, maxIdleTime,
pollingInterval); pollingInterval);
} }

View File

@@ -10,9 +10,9 @@ import org.briarproject.api.plugins.duplex.DuplexPluginFactory;
public class LanTcpPluginFactory implements DuplexPluginFactory { public class LanTcpPluginFactory implements DuplexPluginFactory {
private static final int MAX_FRAME_LENGTH = 1024; private static final int MAX_FRAME_LENGTH = 1024;
private static final long MAX_LATENCY = 60 * 1000; // 1 minute private static final int MAX_LATENCY = 30 * 1000; // 30 seconds
private static final long MAX_IDLE_TIME = 30 * 1000; // 30 seconds private static final int MAX_IDLE_TIME = 30 * 1000; // 30 seconds
private static final long POLLING_INTERVAL = 60 * 1000; // 1 minute private static final int POLLING_INTERVAL = 3 * 60 * 1000; // 3 minutes
private final Executor ioExecutor; private final Executor ioExecutor;

View File

@@ -37,8 +37,8 @@ abstract class TcpPlugin implements DuplexPlugin {
protected final Executor ioExecutor; protected final Executor ioExecutor;
protected final DuplexPluginCallback callback; protected final DuplexPluginCallback callback;
protected final int maxFrameLength, socketTimeout; protected final int maxFrameLength, maxLatency, maxIdleTime;
protected final long maxLatency, maxIdleTime, pollingInterval; protected final int pollingInterval, socketTimeout;
protected volatile boolean running = false; protected volatile boolean running = false;
protected volatile ServerSocket socket = null; protected volatile ServerSocket socket = null;
@@ -53,28 +53,28 @@ abstract class TcpPlugin implements DuplexPlugin {
protected abstract boolean isConnectable(InetSocketAddress remote); protected abstract boolean isConnectable(InetSocketAddress remote);
protected TcpPlugin(Executor ioExecutor, DuplexPluginCallback callback, protected TcpPlugin(Executor ioExecutor, DuplexPluginCallback callback,
int maxFrameLength, long maxLatency, long maxIdleTime, int maxFrameLength, int maxLatency, int maxIdleTime,
long pollingInterval) { int pollingInterval) {
this.ioExecutor = ioExecutor; this.ioExecutor = ioExecutor;
this.callback = callback; this.callback = callback;
this.maxFrameLength = maxFrameLength; this.maxFrameLength = maxFrameLength;
this.maxLatency = maxLatency; this.maxLatency = maxLatency;
this.maxIdleTime = maxIdleTime; this.maxIdleTime = maxIdleTime;
this.pollingInterval = pollingInterval; this.pollingInterval = pollingInterval;
if(2 * maxIdleTime > Integer.MAX_VALUE) if(maxIdleTime > Integer.MAX_VALUE / 2)
socketTimeout = Integer.MAX_VALUE; socketTimeout = Integer.MAX_VALUE;
else socketTimeout = (int) (2 * maxIdleTime); else socketTimeout = maxIdleTime * 2;
} }
public int getMaxFrameLength() { public int getMaxFrameLength() {
return maxFrameLength; return maxFrameLength;
} }
public long getMaxLatency() { public int getMaxLatency() {
return maxLatency; return maxLatency;
} }
public long getMaxIdleTime() { public int getMaxIdleTime() {
return maxIdleTime; return maxIdleTime;
} }
@@ -171,7 +171,7 @@ abstract class TcpPlugin implements DuplexPlugin {
return true; return true;
} }
public long getPollingInterval() { public int getPollingInterval() {
return pollingInterval; return pollingInterval;
} }

View File

@@ -63,11 +63,11 @@ class TcpTransportConnection implements DuplexTransportConnection {
return plugin.getMaxFrameLength(); return plugin.getMaxFrameLength();
} }
public long getMaxLatency() { public int getMaxLatency() {
return plugin.getMaxLatency(); return plugin.getMaxLatency();
} }
public long getMaxIdleTime() { public int getMaxIdleTime() {
return plugin.getMaxIdleTime(); return plugin.getMaxIdleTime();
} }

View File

@@ -20,9 +20,9 @@ class WanTcpPlugin extends TcpPlugin {
private volatile MappingResult mappingResult; private volatile MappingResult mappingResult;
WanTcpPlugin(Executor ioExecutor, DuplexPluginCallback callback, WanTcpPlugin(Executor ioExecutor, PortMapper portMapper,
int maxFrameLength, long maxLatency, long maxIdleTime, DuplexPluginCallback callback, int maxFrameLength, int maxLatency,
long pollingInterval, PortMapper portMapper) { int maxIdleTime, int pollingInterval) {
super(ioExecutor, callback, maxFrameLength, maxLatency, maxIdleTime, super(ioExecutor, callback, maxFrameLength, maxLatency, maxIdleTime,
pollingInterval); pollingInterval);
this.portMapper = portMapper; this.portMapper = portMapper;

View File

@@ -11,9 +11,9 @@ import org.briarproject.api.plugins.duplex.DuplexPluginFactory;
public class WanTcpPluginFactory implements DuplexPluginFactory { public class WanTcpPluginFactory implements DuplexPluginFactory {
private static final int MAX_FRAME_LENGTH = 1024; private static final int MAX_FRAME_LENGTH = 1024;
private static final long MAX_LATENCY = 60 * 1000; // 1 minute private static final int MAX_LATENCY = 30 * 1000; // 30 seconds
private static final long MAX_IDLE_TIME = 30 * 1000; // 30 seconds private static final int MAX_IDLE_TIME = 30 * 1000; // 30 seconds
private static final long POLLING_INTERVAL = 5 * 60 * 1000; // 5 minutes private static final int POLLING_INTERVAL = 5 * 60 * 1000; // 5 minutes
private final Executor ioExecutor; private final Executor ioExecutor;
private final ShutdownManager shutdownManager; private final ShutdownManager shutdownManager;
@@ -29,8 +29,8 @@ public class WanTcpPluginFactory implements DuplexPluginFactory {
} }
public DuplexPlugin createPlugin(DuplexPluginCallback callback) { public DuplexPlugin createPlugin(DuplexPluginCallback callback) {
return new WanTcpPlugin(ioExecutor, callback, MAX_FRAME_LENGTH, return new WanTcpPlugin(ioExecutor, new PortMapperImpl(shutdownManager),
MAX_LATENCY, MAX_IDLE_TIME, POLLING_INTERVAL, callback, MAX_FRAME_LENGTH, MAX_LATENCY, MAX_IDLE_TIME,
new PortMapperImpl(shutdownManager)); POLLING_INTERVAL);
} }
} }

View File

@@ -51,7 +51,7 @@ class KeyManagerImpl extends TimerTask implements KeyManager, EventListener {
private final Timer timer; private final Timer timer;
// All of the following are locking: this // All of the following are locking: this
private final Map<TransportId, Long> maxLatencies; private final Map<TransportId, Integer> maxLatencies;
private final Map<EndpointKey, TemporarySecret> oldSecrets; private final Map<EndpointKey, TemporarySecret> oldSecrets;
private final Map<EndpointKey, TemporarySecret> currentSecrets; private final Map<EndpointKey, TemporarySecret> currentSecrets;
private final Map<EndpointKey, TemporarySecret> newSecrets; private final Map<EndpointKey, TemporarySecret> newSecrets;
@@ -66,7 +66,7 @@ class KeyManagerImpl extends TimerTask implements KeyManager, EventListener {
this.tagRecogniser = tagRecogniser; this.tagRecogniser = tagRecogniser;
this.clock = clock; this.clock = clock;
this.timer = timer; this.timer = timer;
maxLatencies = new HashMap<TransportId, Long>(); maxLatencies = new HashMap<TransportId, Integer>();
oldSecrets = new HashMap<EndpointKey, TemporarySecret>(); oldSecrets = new HashMap<EndpointKey, TemporarySecret>();
currentSecrets = new HashMap<EndpointKey, TemporarySecret>(); currentSecrets = new HashMap<EndpointKey, TemporarySecret>();
newSecrets = new HashMap<EndpointKey, TemporarySecret>(); newSecrets = new HashMap<EndpointKey, TemporarySecret>();
@@ -116,7 +116,7 @@ class KeyManagerImpl extends TimerTask implements KeyManager, EventListener {
Collection<TemporarySecret> dead = new ArrayList<TemporarySecret>(); Collection<TemporarySecret> dead = new ArrayList<TemporarySecret>();
for(TemporarySecret s : secrets) { for(TemporarySecret s : secrets) {
// Discard the secret if the transport has been removed // Discard the secret if the transport has been removed
Long maxLatency = maxLatencies.get(s.getTransportId()); Integer maxLatency = maxLatencies.get(s.getTransportId());
if(maxLatency == null) { if(maxLatency == null) {
LOG.info("Discarding obsolete secret"); LOG.info("Discarding obsolete secret");
ByteUtils.erase(s.getSecret()); ByteUtils.erase(s.getSecret());
@@ -168,7 +168,7 @@ class KeyManagerImpl extends TimerTask implements KeyManager, EventListener {
Collection<TemporarySecret> created = new ArrayList<TemporarySecret>(); Collection<TemporarySecret> created = new ArrayList<TemporarySecret>();
for(Entry<EndpointKey, TemporarySecret> e : newest.entrySet()) { for(Entry<EndpointKey, TemporarySecret> e : newest.entrySet()) {
TemporarySecret s = e.getValue(); TemporarySecret s = e.getValue();
Long maxLatency = maxLatencies.get(s.getTransportId()); Integer maxLatency = maxLatencies.get(s.getTransportId());
if(maxLatency == null) throw new IllegalStateException(); if(maxLatency == null) throw new IllegalStateException();
// Work out which rotation period we're in // Work out which rotation period we're in
long elapsed = now - s.getEpoch(); long elapsed = now - s.getEpoch();
@@ -255,7 +255,7 @@ class KeyManagerImpl extends TimerTask implements KeyManager, EventListener {
return new StreamContext(c, t, secret, streamNumber, s.getAlice()); return new StreamContext(c, t, secret, streamNumber, s.getAlice());
} }
public synchronized void endpointAdded(Endpoint ep, long maxLatency, public synchronized void endpointAdded(Endpoint ep, int maxLatency,
byte[] initialSecret) { byte[] initialSecret) {
maxLatencies.put(ep.getTransportId(), maxLatency); maxLatencies.put(ep.getTransportId(), maxLatency);
// Work out which rotation period we're in // Work out which rotation period we're in

View File

@@ -46,8 +46,7 @@ class BluetoothPlugin implements DuplexPlugin {
private final Clock clock; private final Clock clock;
private final SecureRandom secureRandom; private final SecureRandom secureRandom;
private final DuplexPluginCallback callback; private final DuplexPluginCallback callback;
private final int maxFrameLength; private final int maxFrameLength, maxLatency, pollingInterval;
private final long maxLatency, pollingInterval;
private final Semaphore discoverySemaphore = new Semaphore(1); private final Semaphore discoverySemaphore = new Semaphore(1);
private volatile boolean running = false; private volatile boolean running = false;
@@ -55,8 +54,8 @@ class BluetoothPlugin implements DuplexPlugin {
private volatile LocalDevice localDevice = null; private volatile LocalDevice localDevice = null;
BluetoothPlugin(Executor ioExecutor, Clock clock, SecureRandom secureRandom, BluetoothPlugin(Executor ioExecutor, Clock clock, SecureRandom secureRandom,
DuplexPluginCallback callback, int maxFrameLength, long maxLatency, DuplexPluginCallback callback, int maxFrameLength, int maxLatency,
long pollingInterval) { int pollingInterval) {
this.ioExecutor = ioExecutor; this.ioExecutor = ioExecutor;
this.clock = clock; this.clock = clock;
this.secureRandom = secureRandom; this.secureRandom = secureRandom;
@@ -74,13 +73,13 @@ class BluetoothPlugin implements DuplexPlugin {
return maxFrameLength; return maxFrameLength;
} }
public long getMaxLatency() { public int getMaxLatency() {
return maxLatency; return maxLatency;
} }
public long getMaxIdleTime() { public int getMaxIdleTime() {
// Bluetooth detects dead connections so we don't need keepalives // Bluetooth detects dead connections so we don't need keepalives
return Long.MAX_VALUE; return Integer.MAX_VALUE;
} }
public boolean start() throws IOException { public boolean start() throws IOException {
@@ -186,7 +185,7 @@ class BluetoothPlugin implements DuplexPlugin {
return true; return true;
} }
public long getPollingInterval() { public int getPollingInterval() {
return pollingInterval; return pollingInterval;
} }

View File

@@ -13,8 +13,8 @@ import org.briarproject.system.SystemClock;
public class BluetoothPluginFactory implements DuplexPluginFactory { public class BluetoothPluginFactory implements DuplexPluginFactory {
private static final int MAX_FRAME_LENGTH = 1024; private static final int MAX_FRAME_LENGTH = 1024;
private static final long MAX_LATENCY = 60 * 1000; // 1 minute private static final int MAX_LATENCY = 30 * 1000; // 30 seconds
private static final long POLLING_INTERVAL = 3 * 60 * 1000; // 3 minutes private static final int POLLING_INTERVAL = 3 * 60 * 1000; // 3 minutes
private final Executor ioExecutor; private final Executor ioExecutor;
private final SecureRandom secureRandom; private final SecureRandom secureRandom;

View File

@@ -64,11 +64,11 @@ class BluetoothTransportConnection implements DuplexTransportConnection {
return plugin.getMaxFrameLength(); return plugin.getMaxFrameLength();
} }
public long getMaxLatency() { public int getMaxLatency() {
return plugin.getMaxLatency(); return plugin.getMaxLatency();
} }
public long getMaxIdleTime() { public int getMaxIdleTime() {
return plugin.getMaxIdleTime(); return plugin.getMaxIdleTime();
} }

View File

@@ -13,14 +13,14 @@ class PollingRemovableDriveMonitor implements RemovableDriveMonitor, Runnable {
private final Executor ioExecutor; private final Executor ioExecutor;
private final RemovableDriveFinder finder; private final RemovableDriveFinder finder;
private final long pollingInterval; private final int pollingInterval;
private final Object pollingLock = new Object(); private final Object pollingLock = new Object();
private volatile boolean running = false; private volatile boolean running = false;
private volatile Callback callback = null; private volatile Callback callback = null;
public PollingRemovableDriveMonitor(Executor ioExecutor, public PollingRemovableDriveMonitor(Executor ioExecutor,
RemovableDriveFinder finder, long pollingInterval) { RemovableDriveFinder finder, int pollingInterval) {
this.ioExecutor = ioExecutor; this.ioExecutor = ioExecutor;
this.finder = finder; this.finder = finder;
this.pollingInterval = pollingInterval; this.pollingInterval = pollingInterval;

View File

@@ -29,7 +29,7 @@ implements RemovableDriveMonitor.Callback {
RemovableDrivePlugin(Executor ioExecutor, FileUtils fileUtils, RemovableDrivePlugin(Executor ioExecutor, FileUtils fileUtils,
SimplexPluginCallback callback, RemovableDriveFinder finder, SimplexPluginCallback callback, RemovableDriveFinder finder,
RemovableDriveMonitor monitor, int maxFrameLength, long maxLatency) { RemovableDriveMonitor monitor, int maxFrameLength, int maxLatency) {
super(ioExecutor, fileUtils, callback, maxFrameLength, maxLatency); super(ioExecutor, fileUtils, callback, maxFrameLength, maxLatency);
this.finder = finder; this.finder = finder;
this.monitor = monitor; this.monitor = monitor;
@@ -54,7 +54,7 @@ implements RemovableDriveMonitor.Callback {
return false; return false;
} }
public long getPollingInterval() { public int getPollingInterval() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }

View File

@@ -14,8 +14,8 @@ import org.briarproject.util.OsUtils;
public class RemovableDrivePluginFactory implements SimplexPluginFactory { public class RemovableDrivePluginFactory implements SimplexPluginFactory {
// Maximum latency 14 days (Royal Mail or lackadaisical carrier pigeon) // Maximum latency 14 days (Royal Mail or lackadaisical carrier pigeon)
private static final long MAX_LATENCY = 14 * 24 * 60 * 60 * 1000; private static final int MAX_LATENCY = 14 * 24 * 60 * 60 * 1000;
private static final long POLLING_INTERVAL = 10 * 1000; // 10 seconds private static final int POLLING_INTERVAL = 10 * 1000; // 10 seconds
private final Executor ioExecutor; private final Executor ioExecutor;
private final FileUtils fileUtils; private final FileUtils fileUtils;

View File

@@ -32,21 +32,18 @@ class ModemPlugin implements DuplexPlugin, Modem.Callback {
private final ModemFactory modemFactory; private final ModemFactory modemFactory;
private final SerialPortList serialPortList; private final SerialPortList serialPortList;
private final DuplexPluginCallback callback; private final DuplexPluginCallback callback;
private final int maxFrameLength; private final int maxFrameLength, maxLatency;
private final long maxLatency, pollingInterval;
private volatile boolean running = false; private volatile boolean running = false;
private volatile Modem modem = null; private volatile Modem modem = null;
ModemPlugin(ModemFactory modemFactory, SerialPortList serialPortList, ModemPlugin(ModemFactory modemFactory, SerialPortList serialPortList,
DuplexPluginCallback callback, int maxFrameLength, long maxLatency, DuplexPluginCallback callback, int maxFrameLength, int maxLatency) {
long pollingInterval) {
this.modemFactory = modemFactory; this.modemFactory = modemFactory;
this.serialPortList = serialPortList; this.serialPortList = serialPortList;
this.callback = callback; this.callback = callback;
this.maxFrameLength = maxFrameLength; this.maxFrameLength = maxFrameLength;
this.maxLatency = maxLatency; this.maxLatency = maxLatency;
this.pollingInterval = pollingInterval;
} }
public TransportId getId() { public TransportId getId() {
@@ -57,13 +54,13 @@ class ModemPlugin implements DuplexPlugin, Modem.Callback {
return maxFrameLength; return maxFrameLength;
} }
public long getMaxLatency() { public int getMaxLatency() {
return maxLatency; return maxLatency;
} }
public long getMaxIdleTime() { public int getMaxIdleTime() {
// FIXME: Do we need keepalives for this transport? // FIXME: Do we need keepalives for this transport?
return Long.MAX_VALUE; return Integer.MAX_VALUE;
} }
public boolean start() { public boolean start() {
@@ -103,8 +100,8 @@ class ModemPlugin implements DuplexPlugin, Modem.Callback {
return false; return false;
} }
public long getPollingInterval() { public int getPollingInterval() {
return pollingInterval; throw new UnsupportedOperationException();
} }
public void poll(Collection<ContactId> connected) { public void poll(Collection<ContactId> connected) {
@@ -226,11 +223,11 @@ class ModemPlugin implements DuplexPlugin, Modem.Callback {
return getMaxFrameLength(); return getMaxFrameLength();
} }
public long getMaxLatency() { public int getMaxLatency() {
return getMaxLatency(); return getMaxLatency();
} }
public long getMaxIdleTime() { public int getMaxIdleTime() {
return getMaxIdleTime(); return getMaxIdleTime();
} }

View File

@@ -12,8 +12,7 @@ import org.briarproject.util.StringUtils;
public class ModemPluginFactory implements DuplexPluginFactory { public class ModemPluginFactory implements DuplexPluginFactory {
private static final int MAX_FRAME_LENGTH = 1024; private static final int MAX_FRAME_LENGTH = 1024;
private static final long MAX_LATENCY = 60 * 1000; // 1 minute private static final int MAX_LATENCY = 30 * 1000; // 30 seconds
private static final long POLLING_INTERVAL = 60 * 60 * 1000; // 1 hour
private final ModemFactory modemFactory; private final ModemFactory modemFactory;
private final SerialPortList serialPortList; private final SerialPortList serialPortList;
@@ -33,6 +32,6 @@ public class ModemPluginFactory implements DuplexPluginFactory {
String enabled = callback.getConfig().get("enabled"); String enabled = callback.getConfig().get("enabled");
if(StringUtils.isNullOrEmpty(enabled)) return null; if(StringUtils.isNullOrEmpty(enabled)) return null;
return new ModemPlugin(modemFactory, serialPortList, callback, return new ModemPlugin(modemFactory, serialPortList, callback,
MAX_FRAME_LENGTH, MAX_LATENCY, POLLING_INTERVAL); MAX_FRAME_LENGTH, MAX_LATENCY);
} }
} }

View File

@@ -75,6 +75,7 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
protected final Message message, message1; protected final Message message, message1;
protected final TransportId transportId; protected final TransportId transportId;
protected final TransportProperties transportProperties; protected final TransportProperties transportProperties;
protected final int maxLatency;
protected final ContactId contactId; protected final ContactId contactId;
protected final Contact contact; protected final Contact contact;
protected final Endpoint endpoint; protected final Endpoint endpoint;
@@ -102,6 +103,7 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
transportId = new TransportId("id"); transportId = new TransportId("id");
transportProperties = new TransportProperties(Collections.singletonMap( transportProperties = new TransportProperties(Collections.singletonMap(
"bar", "baz")); "bar", "baz"));
maxLatency = Integer.MAX_VALUE;
contactId = new ContactId(234); contactId = new ContactId(234);
contact = new Contact(contactId, author, localAuthorId); contact = new Contact(contactId, author, localAuthorId);
endpoint = new Endpoint(contactId, transportId, 123, true); endpoint = new Endpoint(contactId, transportId, 123, true);
@@ -691,11 +693,11 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
oneOf(database).getRawMessage(txn, messageId); oneOf(database).getRawMessage(txn, messageId);
will(returnValue(raw)); will(returnValue(raw));
oneOf(database).updateExpiryTime(txn, contactId, messageId, oneOf(database).updateExpiryTime(txn, contactId, messageId,
Long.MAX_VALUE); maxLatency);
oneOf(database).getRawMessage(txn, messageId1); oneOf(database).getRawMessage(txn, messageId1);
will(returnValue(raw1)); will(returnValue(raw1));
oneOf(database).updateExpiryTime(txn, contactId, messageId1, oneOf(database).updateExpiryTime(txn, contactId, messageId1,
Long.MAX_VALUE); maxLatency);
oneOf(database).lowerRequestedFlag(txn, contactId, ids); oneOf(database).lowerRequestedFlag(txn, contactId, ids);
oneOf(database).commitTransaction(txn); oneOf(database).commitTransaction(txn);
}}); }});
@@ -703,7 +705,7 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
eventBus, shutdown); eventBus, shutdown);
assertEquals(messages, db.generateBatch(contactId, size * 2, assertEquals(messages, db.generateBatch(contactId, size * 2,
Long.MAX_VALUE)); maxLatency));
context.assertIsSatisfied(); context.assertIsSatisfied();
} }
@@ -726,15 +728,15 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
oneOf(database).getMessagesToOffer(txn, contactId, 123); oneOf(database).getMessagesToOffer(txn, contactId, 123);
will(returnValue(ids)); will(returnValue(ids));
oneOf(database).updateExpiryTime(txn, contactId, messageId, oneOf(database).updateExpiryTime(txn, contactId, messageId,
Long.MAX_VALUE); maxLatency);
oneOf(database).updateExpiryTime(txn, contactId, messageId1, oneOf(database).updateExpiryTime(txn, contactId, messageId1,
Long.MAX_VALUE); maxLatency);
oneOf(database).commitTransaction(txn); oneOf(database).commitTransaction(txn);
}}); }});
DatabaseComponent db = createDatabaseComponent(database, cleaner, DatabaseComponent db = createDatabaseComponent(database, cleaner,
eventBus, shutdown); eventBus, shutdown);
Offer o = db.generateOffer(contactId, 123, Long.MAX_VALUE); Offer o = db.generateOffer(contactId, 123, maxLatency);
assertEquals(ids, o.getMessageIds()); assertEquals(ids, o.getMessageIds());
context.assertIsSatisfied(); context.assertIsSatisfied();
@@ -791,11 +793,11 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
oneOf(database).getRawMessage(txn, messageId); oneOf(database).getRawMessage(txn, messageId);
will(returnValue(raw)); will(returnValue(raw));
oneOf(database).updateExpiryTime(txn, contactId, messageId, oneOf(database).updateExpiryTime(txn, contactId, messageId,
Long.MAX_VALUE); maxLatency);
oneOf(database).getRawMessage(txn, messageId1); oneOf(database).getRawMessage(txn, messageId1);
will(returnValue(raw1)); will(returnValue(raw1));
oneOf(database).updateExpiryTime(txn, contactId, messageId1, oneOf(database).updateExpiryTime(txn, contactId, messageId1,
Long.MAX_VALUE); maxLatency);
oneOf(database).lowerRequestedFlag(txn, contactId, ids); oneOf(database).lowerRequestedFlag(txn, contactId, ids);
oneOf(database).commitTransaction(txn); oneOf(database).commitTransaction(txn);
}}); }});
@@ -803,7 +805,7 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
eventBus, shutdown); eventBus, shutdown);
assertEquals(messages, db.generateRequestedBatch(contactId, size * 2, assertEquals(messages, db.generateRequestedBatch(contactId, size * 2,
Long.MAX_VALUE)); maxLatency));
context.assertIsSatisfied(); context.assertIsSatisfied();
} }
@@ -821,14 +823,14 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
will(returnValue(txn)); will(returnValue(txn));
oneOf(database).containsContact(txn, contactId); oneOf(database).containsContact(txn, contactId);
will(returnValue(true)); will(returnValue(true));
oneOf(database).getRetentionUpdate(txn, contactId, Long.MAX_VALUE); oneOf(database).getRetentionUpdate(txn, contactId, maxLatency);
will(returnValue(null)); will(returnValue(null));
oneOf(database).commitTransaction(txn); oneOf(database).commitTransaction(txn);
}}); }});
DatabaseComponent db = createDatabaseComponent(database, cleaner, DatabaseComponent db = createDatabaseComponent(database, cleaner,
eventBus, shutdown); eventBus, shutdown);
assertNull(db.generateRetentionUpdate(contactId, Long.MAX_VALUE)); assertNull(db.generateRetentionUpdate(contactId, maxLatency));
context.assertIsSatisfied(); context.assertIsSatisfied();
} }
@@ -846,15 +848,14 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
will(returnValue(txn)); will(returnValue(txn));
oneOf(database).containsContact(txn, contactId); oneOf(database).containsContact(txn, contactId);
will(returnValue(true)); will(returnValue(true));
oneOf(database).getRetentionUpdate(txn, contactId, Long.MAX_VALUE); oneOf(database).getRetentionUpdate(txn, contactId, maxLatency);
will(returnValue(new RetentionUpdate(0, 1))); will(returnValue(new RetentionUpdate(0, 1)));
oneOf(database).commitTransaction(txn); oneOf(database).commitTransaction(txn);
}}); }});
DatabaseComponent db = createDatabaseComponent(database, cleaner, DatabaseComponent db = createDatabaseComponent(database, cleaner,
eventBus, shutdown); eventBus, shutdown);
RetentionUpdate u = db.generateRetentionUpdate(contactId, RetentionUpdate u = db.generateRetentionUpdate(contactId, maxLatency);
Long.MAX_VALUE);
assertEquals(0, u.getRetentionTime()); assertEquals(0, u.getRetentionTime());
assertEquals(1, u.getVersion()); assertEquals(1, u.getVersion());
@@ -874,15 +875,14 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
will(returnValue(txn)); will(returnValue(txn));
oneOf(database).containsContact(txn, contactId); oneOf(database).containsContact(txn, contactId);
will(returnValue(true)); will(returnValue(true));
oneOf(database).getSubscriptionUpdate(txn, contactId, oneOf(database).getSubscriptionUpdate(txn, contactId, maxLatency);
Long.MAX_VALUE);
will(returnValue(null)); will(returnValue(null));
oneOf(database).commitTransaction(txn); oneOf(database).commitTransaction(txn);
}}); }});
DatabaseComponent db = createDatabaseComponent(database, cleaner, DatabaseComponent db = createDatabaseComponent(database, cleaner,
eventBus, shutdown); eventBus, shutdown);
assertNull(db.generateSubscriptionUpdate(contactId, Long.MAX_VALUE)); assertNull(db.generateSubscriptionUpdate(contactId, maxLatency));
context.assertIsSatisfied(); context.assertIsSatisfied();
} }
@@ -900,8 +900,7 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
will(returnValue(txn)); will(returnValue(txn));
oneOf(database).containsContact(txn, contactId); oneOf(database).containsContact(txn, contactId);
will(returnValue(true)); will(returnValue(true));
oneOf(database).getSubscriptionUpdate(txn, contactId, oneOf(database).getSubscriptionUpdate(txn, contactId, maxLatency);
Long.MAX_VALUE);
will(returnValue(new SubscriptionUpdate(Arrays.asList(group), 1))); will(returnValue(new SubscriptionUpdate(Arrays.asList(group), 1)));
oneOf(database).commitTransaction(txn); oneOf(database).commitTransaction(txn);
}}); }});
@@ -909,7 +908,7 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
eventBus, shutdown); eventBus, shutdown);
SubscriptionUpdate u = db.generateSubscriptionUpdate(contactId, SubscriptionUpdate u = db.generateSubscriptionUpdate(contactId,
Long.MAX_VALUE); maxLatency);
assertEquals(Arrays.asList(group), u.getGroups()); assertEquals(Arrays.asList(group), u.getGroups());
assertEquals(1, u.getVersion()); assertEquals(1, u.getVersion());
@@ -929,14 +928,14 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
will(returnValue(txn)); will(returnValue(txn));
oneOf(database).containsContact(txn, contactId); oneOf(database).containsContact(txn, contactId);
will(returnValue(true)); will(returnValue(true));
oneOf(database).getTransportUpdates(txn, contactId, Long.MAX_VALUE); oneOf(database).getTransportUpdates(txn, contactId, maxLatency);
will(returnValue(null)); will(returnValue(null));
oneOf(database).commitTransaction(txn); oneOf(database).commitTransaction(txn);
}}); }});
DatabaseComponent db = createDatabaseComponent(database, cleaner, DatabaseComponent db = createDatabaseComponent(database, cleaner,
eventBus, shutdown); eventBus, shutdown);
assertNull(db.generateTransportUpdates(contactId, Long.MAX_VALUE)); assertNull(db.generateTransportUpdates(contactId, maxLatency));
context.assertIsSatisfied(); context.assertIsSatisfied();
} }
@@ -954,7 +953,7 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
will(returnValue(txn)); will(returnValue(txn));
oneOf(database).containsContact(txn, contactId); oneOf(database).containsContact(txn, contactId);
will(returnValue(true)); will(returnValue(true));
oneOf(database).getTransportUpdates(txn, contactId, Long.MAX_VALUE); oneOf(database).getTransportUpdates(txn, contactId, maxLatency);
will(returnValue(Arrays.asList(new TransportUpdate(transportId, will(returnValue(Arrays.asList(new TransportUpdate(transportId,
transportProperties, 1)))); transportProperties, 1))));
oneOf(database).commitTransaction(txn); oneOf(database).commitTransaction(txn);
@@ -963,7 +962,7 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
eventBus, shutdown); eventBus, shutdown);
Collection<TransportUpdate> updates = Collection<TransportUpdate> updates =
db.generateTransportUpdates(contactId, Long.MAX_VALUE); db.generateTransportUpdates(contactId, maxLatency);
assertNotNull(updates); assertNotNull(updates);
assertEquals(1, updates.size()); assertEquals(1, updates.size());
TransportUpdate u = updates.iterator().next(); TransportUpdate u = updates.iterator().next();

View File

@@ -32,32 +32,30 @@ public class ExponentialBackoffTest extends BriarTestCase {
assertEquals(now, fromNow - fromZero); assertEquals(now, fromNow - fromZero);
} }
@Test
public void testRoundTripTimeOverflow() {
long maxLatency = Long.MAX_VALUE / 2 + 1; // RTT will overflow
long expiry = ExponentialBackoff.calculateExpiry(0, maxLatency, 0);
assertEquals(Long.MAX_VALUE, expiry); // Overflow caught
}
@Test @Test
public void testTransmissionCountOverflow() { public void testTransmissionCountOverflow() {
long maxLatency = (Long.MAX_VALUE - 1) / 2; // RTT will not overflow int maxLatency = Integer.MAX_VALUE; // RTT will not overflow
long expiry = ExponentialBackoff.calculateExpiry(0, maxLatency, 0); long expiry = ExponentialBackoff.calculateExpiry(0, maxLatency, 0);
assertEquals(Long.MAX_VALUE - 1, expiry); // No overflow assertEquals(Integer.MAX_VALUE * 2L, expiry); // No overflow
expiry = ExponentialBackoff.calculateExpiry(0, maxLatency, 1); expiry = ExponentialBackoff.calculateExpiry(0, maxLatency, 31);
assertEquals(Integer.MAX_VALUE * (2L << 31), expiry); // No overflow
expiry = ExponentialBackoff.calculateExpiry(0, maxLatency, 32);
assertEquals(Long.MAX_VALUE, expiry); // Overflow caught assertEquals(Long.MAX_VALUE, expiry); // Overflow caught
expiry = ExponentialBackoff.calculateExpiry(0, maxLatency, 2); expiry = ExponentialBackoff.calculateExpiry(0, maxLatency, 33);
assertEquals(Long.MAX_VALUE, expiry); // Overflow caught assertEquals(Long.MAX_VALUE, expiry); // Overflow caught
} }
@Test @Test
public void testCurrentTimeOverflow() { public void testCurrentTimeOverflow() {
long maxLatency = (Long.MAX_VALUE - 1) / 2; // RTT will not overflow int maxLatency = Integer.MAX_VALUE; // RTT will not overflow
long expiry = ExponentialBackoff.calculateExpiry(0, maxLatency, 0); long now = Long.MAX_VALUE - (Integer.MAX_VALUE * (2L << 31));
long expiry = ExponentialBackoff.calculateExpiry(now, maxLatency, 0);
assertEquals(now + Integer.MAX_VALUE * 2L, expiry); // No overflow
expiry = ExponentialBackoff.calculateExpiry(now - 1, maxLatency, 31);
assertEquals(Long.MAX_VALUE - 1, expiry); // No overflow assertEquals(Long.MAX_VALUE - 1, expiry); // No overflow
expiry = ExponentialBackoff.calculateExpiry(1, maxLatency, 0); expiry = ExponentialBackoff.calculateExpiry(now, maxLatency, 31);
assertEquals(Long.MAX_VALUE, expiry); // No overflow assertEquals(Long.MAX_VALUE, expiry); // No overflow
expiry = ExponentialBackoff.calculateExpiry(2, maxLatency, 0); expiry = ExponentialBackoff.calculateExpiry(now + 1, maxLatency, 32);
assertEquals(Long.MAX_VALUE, expiry); // Overflow caught assertEquals(Long.MAX_VALUE, expiry); // Overflow caught
} }
} }

View File

@@ -381,7 +381,7 @@ public class H2DatabaseTest extends BriarTestCase {
assertTrue(it.hasNext()); assertTrue(it.hasNext());
assertEquals(messageId, it.next()); assertEquals(messageId, it.next());
assertFalse(it.hasNext()); assertFalse(it.hasNext());
db.updateExpiryTime(txn, contactId, messageId, Long.MAX_VALUE); db.updateExpiryTime(txn, contactId, messageId, Integer.MAX_VALUE);
// The message should no longer be sendable // The message should no longer be sendable
it = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE).iterator(); it = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE).iterator();
@@ -1109,7 +1109,8 @@ public class H2DatabaseTest extends BriarTestCase {
@Test @Test
public void testTemporarySecrets() throws Exception { public void testTemporarySecrets() throws Exception {
// Create an endpoint and four consecutive temporary secrets // Create an endpoint and four consecutive temporary secrets
long epoch = 123, latency = 234; long epoch = 123;
int latency = 234;
boolean alice = false; boolean alice = false;
long outgoing1 = 345, centre1 = 456; long outgoing1 = 345, centre1 = 456;
long outgoing2 = 567, centre2 = 678; long outgoing2 = 567, centre2 = 678;
@@ -1235,7 +1236,8 @@ public class H2DatabaseTest extends BriarTestCase {
@Test @Test
public void testIncrementStreamCounter() throws Exception { public void testIncrementStreamCounter() throws Exception {
// Create an endpoint and a temporary secret // Create an endpoint and a temporary secret
long epoch = 123, latency = 234; long epoch = 123;
int latency = 234;
boolean alice = false; boolean alice = false;
long period = 345, outgoing = 456, centre = 567; long period = 345, outgoing = 456, centre = 567;
Endpoint ep = new Endpoint(contactId, transportId, epoch, alice); Endpoint ep = new Endpoint(contactId, transportId, epoch, alice);
@@ -1290,7 +1292,8 @@ public class H2DatabaseTest extends BriarTestCase {
@Test @Test
public void testSetReorderingWindow() throws Exception { public void testSetReorderingWindow() throws Exception {
// Create an endpoint and a temporary secret // Create an endpoint and a temporary secret
long epoch = 123, latency = 234; long epoch = 123;
int latency = 234;
boolean alice = false; boolean alice = false;
long period = 345, outgoing = 456, centre = 567; long period = 345, outgoing = 456, centre = 567;
Endpoint ep = new Endpoint(contactId, transportId, epoch, alice); Endpoint ep = new Endpoint(contactId, transportId, epoch, alice);
@@ -1359,8 +1362,8 @@ public class H2DatabaseTest extends BriarTestCase {
@Test @Test
public void testEndpoints() throws Exception { public void testEndpoints() throws Exception {
// Create some endpoints // Create some endpoints
long epoch1 = 123, latency1 = 234; long epoch1 = 123, epoch2 = 234;
long epoch2 = 345, latency2 = 456; int latency1 = 345, latency2 = 456;
boolean alice1 = true, alice2 = false; boolean alice1 = true, alice2 = false;
TransportId transportId1 = new TransportId("bar"); TransportId transportId1 = new TransportId("bar");
TransportId transportId2 = new TransportId("baz"); TransportId transportId2 = new TransportId("baz");

View File

@@ -2,6 +2,7 @@ package org.briarproject.messaging;
import static org.briarproject.api.AuthorConstants.MAX_PUBLIC_KEY_LENGTH; import static org.briarproject.api.AuthorConstants.MAX_PUBLIC_KEY_LENGTH;
import static org.briarproject.api.messaging.MessagingConstants.GROUP_SALT_LENGTH; import static org.briarproject.api.messaging.MessagingConstants.GROUP_SALT_LENGTH;
import static org.briarproject.api.transport.TransportConstants.MAX_CLOCK_DIFFERENCE;
import static org.briarproject.api.transport.TransportConstants.MAX_FRAME_LENGTH; import static org.briarproject.api.transport.TransportConstants.MAX_FRAME_LENGTH;
import static org.briarproject.api.transport.TransportConstants.TAG_LENGTH; import static org.briarproject.api.transport.TransportConstants.TAG_LENGTH;
@@ -32,7 +33,9 @@ import org.briarproject.api.messaging.Message;
import org.briarproject.api.messaging.MessageFactory; import org.briarproject.api.messaging.MessageFactory;
import org.briarproject.api.messaging.MessageVerifier; import org.briarproject.api.messaging.MessageVerifier;
import org.briarproject.api.messaging.MessagingSession; import org.briarproject.api.messaging.MessagingSession;
import org.briarproject.api.messaging.PacketReader;
import org.briarproject.api.messaging.PacketReaderFactory; import org.briarproject.api.messaging.PacketReaderFactory;
import org.briarproject.api.messaging.PacketWriter;
import org.briarproject.api.messaging.PacketWriterFactory; import org.briarproject.api.messaging.PacketWriterFactory;
import org.briarproject.api.transport.Endpoint; import org.briarproject.api.transport.Endpoint;
import org.briarproject.api.transport.StreamContext; import org.briarproject.api.transport.StreamContext;
@@ -56,8 +59,9 @@ import com.google.inject.Injector;
public class SimplexMessagingIntegrationTest extends BriarTestCase { public class SimplexMessagingIntegrationTest extends BriarTestCase {
private static final long CLOCK_DIFFERENCE = 60 * 1000; private static final int MAX_LATENCY = 2 * 60 * 1000; // 2 minutes
private static final long LATENCY = 60 * 1000; private static final int ROTATION_PERIOD =
MAX_CLOCK_DIFFERENCE + MAX_LATENCY;
private final File testDir = TestUtils.getTestDirectory(); private final File testDir = TestUtils.getTestDirectory();
private final File aliceDir = new File(testDir, "alice"); private final File aliceDir = new File(testDir, "alice");
@@ -73,8 +77,7 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
// Create matching secrets for Alice and Bob // Create matching secrets for Alice and Bob
initialSecret = new byte[32]; initialSecret = new byte[32];
new Random().nextBytes(initialSecret); new Random().nextBytes(initialSecret);
long rotationPeriod = 2 * CLOCK_DIFFERENCE + LATENCY; epoch = System.currentTimeMillis() - 2 * ROTATION_PERIOD;
epoch = System.currentTimeMillis() - 2 * rotationPeriod;
} }
@Override @Override
@@ -121,10 +124,10 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
db.addGroup(group); db.addGroup(group);
db.setInboxGroup(contactId, group); db.setInboxGroup(contactId, group);
// Add the transport and the endpoint // Add the transport and the endpoint
db.addTransport(transportId, LATENCY); db.addTransport(transportId, MAX_LATENCY);
Endpoint ep = new Endpoint(contactId, transportId, epoch, true); Endpoint ep = new Endpoint(contactId, transportId, epoch, true);
db.addEndpoint(ep); db.addEndpoint(ep);
keyManager.endpointAdded(ep, LATENCY, initialSecret.clone()); keyManager.endpointAdded(ep, MAX_LATENCY, initialSecret.clone());
// Send Bob a message // Send Bob a message
String contentType = "text/plain"; String contentType = "text/plain";
long timestamp = System.currentTimeMillis(); long timestamp = System.currentTimeMillis();
@@ -145,10 +148,11 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
EventBus eventBus = alice.getInstance(EventBus.class); EventBus eventBus = alice.getInstance(EventBus.class);
PacketWriterFactory packetWriterFactory = PacketWriterFactory packetWriterFactory =
alice.getInstance(PacketWriterFactory.class); alice.getInstance(PacketWriterFactory.class);
MessagingSession session = new SimplexOutgoingSession(db, PacketWriter packetWriter = packetWriterFactory.createPacketWriter(
new ImmediateExecutor(), eventBus, packetWriterFactory,
contactId, transportId, Long.MAX_VALUE,
streamWriter.getOutputStream()); streamWriter.getOutputStream());
MessagingSession session = new SimplexOutgoingSession(db,
new ImmediateExecutor(), eventBus, contactId, transportId,
MAX_LATENCY, packetWriter);
// Write whatever needs to be written // Write whatever needs to be written
session.run(); session.run();
streamWriter.getOutputStream().close(); streamWriter.getOutputStream().close();
@@ -182,10 +186,10 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
db.addGroup(group); db.addGroup(group);
db.setInboxGroup(contactId, group); db.setInboxGroup(contactId, group);
// Add the transport and the endpoint // Add the transport and the endpoint
db.addTransport(transportId, LATENCY); db.addTransport(transportId, MAX_LATENCY);
Endpoint ep = new Endpoint(contactId, transportId, epoch, false); Endpoint ep = new Endpoint(contactId, transportId, epoch, false);
db.addEndpoint(ep); db.addEndpoint(ep);
keyManager.endpointAdded(ep, LATENCY, initialSecret.clone()); keyManager.endpointAdded(ep, MAX_LATENCY, initialSecret.clone());
// Set up an event listener // Set up an event listener
MessageListener listener = new MessageListener(); MessageListener listener = new MessageListener();
bob.getInstance(EventBus.class).addListener(listener); bob.getInstance(EventBus.class).addListener(listener);
@@ -208,10 +212,11 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
bob.getInstance(MessageVerifier.class); bob.getInstance(MessageVerifier.class);
PacketReaderFactory packetReaderFactory = PacketReaderFactory packetReaderFactory =
bob.getInstance(PacketReaderFactory.class); bob.getInstance(PacketReaderFactory.class);
PacketReader packetReader = packetReaderFactory.createPacketReader(
streamReader.getInputStream());
MessagingSession session = new IncomingSession(db, MessagingSession session = new IncomingSession(db,
new ImmediateExecutor(), new ImmediateExecutor(), eventBus, new ImmediateExecutor(), new ImmediateExecutor(), eventBus,
messageVerifier, packetReaderFactory, contactId, transportId, messageVerifier, contactId, transportId, packetReader);
streamReader.getInputStream());
// No messages should have been added yet // No messages should have been added yet
assertFalse(listener.messageAdded); assertFalse(listener.messageAdded);
// Read whatever needs to be read // Read whatever needs to be read

View File

@@ -1,72 +1,53 @@
package org.briarproject.messaging; package org.briarproject.messaging;
import java.io.ByteArrayOutputStream;
import java.util.Arrays; import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import org.briarproject.BriarTestCase; import org.briarproject.BriarTestCase;
import org.briarproject.TestUtils; import org.briarproject.TestUtils;
import org.briarproject.api.ContactId; import org.briarproject.api.ContactId;
import org.briarproject.api.TransportId; import org.briarproject.api.TransportId;
import org.briarproject.api.UniqueId;
import org.briarproject.api.db.DatabaseComponent; import org.briarproject.api.db.DatabaseComponent;
import org.briarproject.api.event.EventBus; import org.briarproject.api.event.EventBus;
import org.briarproject.api.messaging.Ack; import org.briarproject.api.messaging.Ack;
import org.briarproject.api.messaging.MessageId; import org.briarproject.api.messaging.MessageId;
import org.briarproject.api.messaging.PacketWriterFactory; import org.briarproject.api.messaging.PacketWriter;
import org.briarproject.plugins.ImmediateExecutor; import org.briarproject.plugins.ImmediateExecutor;
import org.briarproject.serial.SerialModule;
import org.jmock.Expectations; import org.jmock.Expectations;
import org.jmock.Mockery; import org.jmock.Mockery;
import org.junit.Test; import org.junit.Test;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
public class SimplexOutgoingSessionTest extends BriarTestCase { public class SimplexOutgoingSessionTest extends BriarTestCase {
// FIXME: This is an integration test, not a unit test private static final int MAX_MESSAGES_PER_ACK = 10;
private final Mockery context; private final Mockery context;
private final DatabaseComponent db; private final DatabaseComponent db;
private final Executor dbExecutor; private final Executor dbExecutor;
private final EventBus eventBus; private final EventBus eventBus;
private final PacketWriterFactory packetWriterFactory;
private final ContactId contactId; private final ContactId contactId;
private final TransportId transportId; private final TransportId transportId;
private final MessageId messageId; private final MessageId messageId;
private final byte[] secret; private final int maxLatency;
private final PacketWriter packetWriter;
public SimplexOutgoingSessionTest() { public SimplexOutgoingSessionTest() {
context = new Mockery(); context = new Mockery();
db = context.mock(DatabaseComponent.class); db = context.mock(DatabaseComponent.class);
dbExecutor = new ImmediateExecutor(); dbExecutor = new ImmediateExecutor();
Module testModule = new AbstractModule() {
@Override
public void configure() {
bind(PacketWriterFactory.class).to(
PacketWriterFactoryImpl.class);
}
};
Injector i = Guice.createInjector(testModule, new SerialModule());
eventBus = context.mock(EventBus.class); eventBus = context.mock(EventBus.class);
packetWriterFactory = i.getInstance(PacketWriterFactory.class); packetWriter = context.mock(PacketWriter.class);
contactId = new ContactId(234); contactId = new ContactId(234);
transportId = new TransportId("id"); transportId = new TransportId("id");
messageId = new MessageId(TestUtils.getRandomId()); messageId = new MessageId(TestUtils.getRandomId());
secret = new byte[32]; maxLatency = Integer.MAX_VALUE;
new Random().nextBytes(secret);
} }
@Test @Test
public void testNothingToSend() throws Exception { public void testNothingToSend() throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
final SimplexOutgoingSession session = new SimplexOutgoingSession(db, final SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, packetWriterFactory, contactId, dbExecutor, eventBus, contactId, transportId, maxLatency,
transportId, Long.MAX_VALUE, out); packetWriter);
context.checking(new Expectations() {{ context.checking(new Expectations() {{
// Add listener // Add listener
oneOf(eventBus).addListener(session); oneOf(eventBus).addListener(session);
@@ -74,46 +55,45 @@ public class SimplexOutgoingSessionTest extends BriarTestCase {
oneOf(db).generateTransportAcks(contactId); oneOf(db).generateTransportAcks(contactId);
will(returnValue(null)); will(returnValue(null));
// No transport updates to send // No transport updates to send
oneOf(db).generateTransportUpdates(with(contactId), oneOf(db).generateTransportUpdates(contactId, maxLatency);
with(any(long.class)));
will(returnValue(null)); will(returnValue(null));
// No subscription ack to send // No subscription ack to send
oneOf(db).generateSubscriptionAck(contactId); oneOf(db).generateSubscriptionAck(contactId);
will(returnValue(null)); will(returnValue(null));
// No subscription update to send // No subscription update to send
oneOf(db).generateSubscriptionUpdate(with(contactId), oneOf(db).generateSubscriptionUpdate(contactId, maxLatency);
with(any(long.class)));
will(returnValue(null)); will(returnValue(null));
// No retention ack to send // No retention ack to send
oneOf(db).generateRetentionAck(contactId); oneOf(db).generateRetentionAck(contactId);
will(returnValue(null)); will(returnValue(null));
// No retention update to send // No retention update to send
oneOf(db).generateRetentionUpdate(with(contactId), oneOf(db).generateRetentionUpdate(contactId, maxLatency);
with(any(long.class)));
will(returnValue(null)); will(returnValue(null));
// No acks to send // No acks to send
oneOf(db).generateAck(with(contactId), with(any(int.class))); oneOf(packetWriter).getMaxMessagesForAck(with(any(long.class)));
will(returnValue(MAX_MESSAGES_PER_ACK));
oneOf(db).generateAck(contactId, MAX_MESSAGES_PER_ACK);
will(returnValue(null)); will(returnValue(null));
// No messages to send // No messages to send
oneOf(db).generateBatch(with(contactId), with(any(int.class)), oneOf(db).generateBatch(with(contactId), with(any(int.class)),
with(any(long.class))); with(maxLatency));
will(returnValue(null)); will(returnValue(null));
// Flush the output stream
oneOf(packetWriter).flush();
// Remove listener // Remove listener
oneOf(eventBus).removeListener(session); oneOf(eventBus).removeListener(session);
}}); }});
session.run(); session.run();
// Nothing should have been written
assertEquals(0, out.size());
context.assertIsSatisfied(); context.assertIsSatisfied();
} }
@Test @Test
public void testSomethingToSend() throws Exception { public void testSomethingToSend() throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream(); final Ack ack = new Ack(Arrays.asList(messageId));
final SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, packetWriterFactory, contactId,
transportId, Long.MAX_VALUE, out);
final byte[] raw = new byte[1234]; final byte[] raw = new byte[1234];
final SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, transportId, maxLatency,
packetWriter);
context.checking(new Expectations() {{ context.checking(new Expectations() {{
// Add listener // Add listener
oneOf(eventBus).addListener(session); oneOf(eventBus).addListener(session);
@@ -121,43 +101,46 @@ public class SimplexOutgoingSessionTest extends BriarTestCase {
oneOf(db).generateTransportAcks(contactId); oneOf(db).generateTransportAcks(contactId);
will(returnValue(null)); will(returnValue(null));
// No transport updates to send // No transport updates to send
oneOf(db).generateTransportUpdates(with(contactId), oneOf(db).generateTransportUpdates(contactId, maxLatency);
with(any(long.class)));
will(returnValue(null)); will(returnValue(null));
// No subscription ack to send // No subscription ack to send
oneOf(db).generateSubscriptionAck(contactId); oneOf(db).generateSubscriptionAck(contactId);
will(returnValue(null)); will(returnValue(null));
// No subscription update to send // No subscription update to send
oneOf(db).generateSubscriptionUpdate(with(contactId), oneOf(db).generateSubscriptionUpdate(contactId, maxLatency);
with(any(long.class)));
will(returnValue(null)); will(returnValue(null));
// No retention ack to send // No retention ack to send
oneOf(db).generateRetentionAck(contactId); oneOf(db).generateRetentionAck(contactId);
will(returnValue(null)); will(returnValue(null));
// No retention update to send // No retention update to send
oneOf(db).generateRetentionUpdate(with(contactId), oneOf(db).generateRetentionUpdate(contactId, maxLatency);
with(any(long.class)));
will(returnValue(null)); will(returnValue(null));
// One ack to send // One ack to send
oneOf(db).generateAck(with(contactId), with(any(int.class))); oneOf(packetWriter).getMaxMessagesForAck(with(any(long.class)));
will(returnValue(new Ack(Arrays.asList(messageId)))); will(returnValue(MAX_MESSAGES_PER_ACK));
oneOf(db).generateAck(contactId, MAX_MESSAGES_PER_ACK);
will(returnValue(ack));
oneOf(packetWriter).writeAck(ack);
// No more acks // No more acks
oneOf(db).generateAck(with(contactId), with(any(int.class))); oneOf(packetWriter).getMaxMessagesForAck(with(any(long.class)));
will(returnValue(MAX_MESSAGES_PER_ACK));
oneOf(db).generateAck(contactId, MAX_MESSAGES_PER_ACK);
will(returnValue(null)); will(returnValue(null));
// One message to send // One message to send
oneOf(db).generateBatch(with(contactId), with(any(int.class)), oneOf(db).generateBatch(with(contactId), with(any(int.class)),
with(any(long.class))); with(maxLatency));
will(returnValue(Arrays.asList(raw))); will(returnValue(Arrays.asList(raw)));
oneOf(packetWriter).writeMessage(raw);
// No more messages // No more messages
oneOf(db).generateBatch(with(contactId), with(any(int.class)), oneOf(db).generateBatch(with(contactId), with(any(int.class)),
with(any(long.class))); with(maxLatency));
will(returnValue(null)); will(returnValue(null));
// Flush the output stream
oneOf(packetWriter).flush();
// Remove listener // Remove listener
oneOf(eventBus).removeListener(session); oneOf(eventBus).removeListener(session);
}}); }});
session.run(); session.run();
// Something should have been written
assertTrue(out.size() > UniqueId.LENGTH + raw.length);
context.assertIsSatisfied(); context.assertIsSatisfied();
} }
} }

View File

@@ -44,19 +44,19 @@ public class PluginManagerImplTest extends BriarTestCase {
context.mock(SimplexPluginFactory.class); context.mock(SimplexPluginFactory.class);
final SimplexPlugin simplexPlugin = context.mock(SimplexPlugin.class); final SimplexPlugin simplexPlugin = context.mock(SimplexPlugin.class);
final TransportId simplexId = new TransportId("simplex"); final TransportId simplexId = new TransportId("simplex");
final long simplexLatency = 12345; final int simplexLatency = 12345;
final SimplexPluginFactory simplexFailFactory = final SimplexPluginFactory simplexFailFactory =
context.mock(SimplexPluginFactory.class, "simplexFailFactory"); context.mock(SimplexPluginFactory.class, "simplexFailFactory");
final SimplexPlugin simplexFailPlugin = final SimplexPlugin simplexFailPlugin =
context.mock(SimplexPlugin.class, "simplexFailPlugin"); context.mock(SimplexPlugin.class, "simplexFailPlugin");
final TransportId simplexFailId = new TransportId("simplex1"); final TransportId simplexFailId = new TransportId("simplex1");
final long simplexFailLatency = 23456; final int simplexFailLatency = 23456;
// Two duplex plugin factories: one creates a plugin, the other fails // Two duplex plugin factories: one creates a plugin, the other fails
final DuplexPluginFactory duplexFactory = final DuplexPluginFactory duplexFactory =
context.mock(DuplexPluginFactory.class); context.mock(DuplexPluginFactory.class);
final DuplexPlugin duplexPlugin = context.mock(DuplexPlugin.class); final DuplexPlugin duplexPlugin = context.mock(DuplexPlugin.class);
final TransportId duplexId = new TransportId("duplex"); final TransportId duplexId = new TransportId("duplex");
final long duplexLatency = 34567; final int duplexLatency = 34567;
final DuplexPluginFactory duplexFailFactory = final DuplexPluginFactory duplexFailFactory =
context.mock(DuplexPluginFactory.class, "duplexFailFactory"); context.mock(DuplexPluginFactory.class, "duplexFailFactory");
final TransportId duplexFailId = new TransportId("duplex1"); final TransportId duplexFailId = new TransportId("duplex1");

View File

@@ -24,7 +24,7 @@ public class ModemPluginTest extends BriarTestCase {
final SerialPortList serialPortList = final SerialPortList serialPortList =
context.mock(SerialPortList.class); context.mock(SerialPortList.class);
final ModemPlugin plugin = new ModemPlugin(modemFactory, final ModemPlugin plugin = new ModemPlugin(modemFactory,
serialPortList, null, 0, 0, 0); serialPortList, null, 0, 0);
final Modem modem = context.mock(Modem.class); final Modem modem = context.mock(Modem.class);
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(serialPortList).getPortNames(); oneOf(serialPortList).getPortNames();
@@ -58,7 +58,7 @@ public class ModemPluginTest extends BriarTestCase {
final DuplexPluginCallback callback = final DuplexPluginCallback callback =
context.mock(DuplexPluginCallback.class); context.mock(DuplexPluginCallback.class);
final ModemPlugin plugin = new ModemPlugin(modemFactory, final ModemPlugin plugin = new ModemPlugin(modemFactory,
serialPortList, callback, 0, 0, 0); serialPortList, callback, 0, 0);
final Modem modem = context.mock(Modem.class); final Modem modem = context.mock(Modem.class);
final TransportProperties local = new TransportProperties(); final TransportProperties local = new TransportProperties();
local.put("iso3166", ISO_1336); local.put("iso3166", ISO_1336);
@@ -99,7 +99,7 @@ public class ModemPluginTest extends BriarTestCase {
final DuplexPluginCallback callback = final DuplexPluginCallback callback =
context.mock(DuplexPluginCallback.class); context.mock(DuplexPluginCallback.class);
final ModemPlugin plugin = new ModemPlugin(modemFactory, final ModemPlugin plugin = new ModemPlugin(modemFactory,
serialPortList, callback, 0, 0, 0); serialPortList, callback, 0, 0);
final Modem modem = context.mock(Modem.class); final Modem modem = context.mock(Modem.class);
final TransportProperties local = new TransportProperties(); final TransportProperties local = new TransportProperties();
local.put("iso3166", ISO_1336); local.put("iso3166", ISO_1336);
@@ -140,7 +140,7 @@ public class ModemPluginTest extends BriarTestCase {
final DuplexPluginCallback callback = final DuplexPluginCallback callback =
context.mock(DuplexPluginCallback.class); context.mock(DuplexPluginCallback.class);
final ModemPlugin plugin = new ModemPlugin(modemFactory, final ModemPlugin plugin = new ModemPlugin(modemFactory,
serialPortList, callback, 0, 0, 0); serialPortList, callback, 0, 0);
final Modem modem = context.mock(Modem.class); final Modem modem = context.mock(Modem.class);
final TransportProperties local = new TransportProperties(); final TransportProperties local = new TransportProperties();
local.put("iso3166", ISO_1336); local.put("iso3166", ISO_1336);

View File

@@ -26,9 +26,9 @@ import org.junit.Test;
public class KeyManagerImplTest extends BriarTestCase { public class KeyManagerImplTest extends BriarTestCase {
private static final long EPOCH = 1000L * 1000L * 1000L * 1000L; private static final long EPOCH = 1000L * 1000L * 1000L * 1000L;
private static final long MAX_LATENCY = 2 * 60 * 1000; // 2 minutes private static final int MAX_LATENCY = 2 * 60 * 1000; // 2 minutes
private static final long ROTATION_PERIOD_LENGTH = private static final int ROTATION_PERIOD =
MAX_LATENCY + MAX_CLOCK_DIFFERENCE; MAX_CLOCK_DIFFERENCE + MAX_LATENCY;
private final ContactId contactId; private final ContactId contactId;
private final TransportId transportId; private final TransportId transportId;
@@ -294,7 +294,7 @@ public class KeyManagerImplTest extends BriarTestCase {
MAX_LATENCY))); MAX_LATENCY)));
// The current time is the start of period 2 // The current time is the start of period 2
oneOf(clock).currentTimeMillis(); oneOf(clock).currentTimeMillis();
will(returnValue(EPOCH + ROTATION_PERIOD_LENGTH)); will(returnValue(EPOCH + ROTATION_PERIOD));
// The secret for period 3 should be derived and stored // The secret for period 3 should be derived and stored
oneOf(crypto).deriveNextSecret(secret0, 1); oneOf(crypto).deriveNextSecret(secret0, 1);
will(returnValue(secret1.clone())); will(returnValue(secret1.clone()));
@@ -353,7 +353,7 @@ public class KeyManagerImplTest extends BriarTestCase {
MAX_LATENCY))); MAX_LATENCY)));
// The current time is the end of period 3 // The current time is the end of period 3
oneOf(clock).currentTimeMillis(); oneOf(clock).currentTimeMillis();
will(returnValue(EPOCH + 3 * ROTATION_PERIOD_LENGTH - 1)); will(returnValue(EPOCH + 3 * ROTATION_PERIOD - 1));
// The secrets for periods 3 and 4 should be derived from secret 1 // The secrets for periods 3 and 4 should be derived from secret 1
oneOf(crypto).deriveNextSecret(secret1, 2); oneOf(crypto).deriveNextSecret(secret1, 2);
will(returnValue(secret2.clone())); will(returnValue(secret2.clone()));
@@ -484,7 +484,7 @@ public class KeyManagerImplTest extends BriarTestCase {
with(any(long.class)), with(any(long.class))); with(any(long.class)), with(any(long.class)));
// run() during period 2: the secrets should be rotated // run() during period 2: the secrets should be rotated
oneOf(clock).currentTimeMillis(); oneOf(clock).currentTimeMillis();
will(returnValue(EPOCH + ROTATION_PERIOD_LENGTH + 1)); will(returnValue(EPOCH + ROTATION_PERIOD + 1));
oneOf(crypto).deriveNextSecret(secret0, 1); oneOf(crypto).deriveNextSecret(secret0, 1);
will(returnValue(secret1.clone())); will(returnValue(secret1.clone()));
oneOf(crypto).deriveNextSecret(secret1, 2); oneOf(crypto).deriveNextSecret(secret1, 2);
@@ -559,7 +559,7 @@ public class KeyManagerImplTest extends BriarTestCase {
with(any(long.class)), with(any(long.class))); with(any(long.class)), with(any(long.class)));
// run() during period 3 (late): the secrets should be rotated // run() during period 3 (late): the secrets should be rotated
oneOf(clock).currentTimeMillis(); oneOf(clock).currentTimeMillis();
will(returnValue(EPOCH + 2 * ROTATION_PERIOD_LENGTH + 1)); will(returnValue(EPOCH + 2 * ROTATION_PERIOD + 1));
oneOf(crypto).deriveNextSecret(secret1, 2); oneOf(crypto).deriveNextSecret(secret1, 2);
will(returnValue(secret2.clone())); will(returnValue(secret2.clone()));
oneOf(crypto).deriveNextSecret(secret2, 3); oneOf(crypto).deriveNextSecret(secret2, 3);

View File

@@ -32,9 +32,9 @@ import org.junit.Test;
public class KeyRotationIntegrationTest extends BriarTestCase { public class KeyRotationIntegrationTest extends BriarTestCase {
private static final long EPOCH = 1000L * 1000L * 1000L * 1000L; private static final long EPOCH = 1000L * 1000L * 1000L * 1000L;
private static final long MAX_LATENCY = 2 * 60 * 1000; // 2 minutes private static final int MAX_LATENCY = 2 * 60 * 1000; // 2 minutes
private static final long ROTATION_PERIOD_LENGTH = private static final int ROTATION_PERIOD =
MAX_LATENCY + MAX_CLOCK_DIFFERENCE; MAX_CLOCK_DIFFERENCE + MAX_LATENCY;
private final ContactId contactId; private final ContactId contactId;
private final TransportId transportId; private final TransportId transportId;
@@ -653,7 +653,7 @@ public class KeyRotationIntegrationTest extends BriarTestCase {
MAX_LATENCY))); MAX_LATENCY)));
// The current time is the start of period 2 // The current time is the start of period 2
oneOf(clock).currentTimeMillis(); oneOf(clock).currentTimeMillis();
will(returnValue(EPOCH + ROTATION_PERIOD_LENGTH)); will(returnValue(EPOCH + ROTATION_PERIOD));
// The secret for period 3 should be derived and stored // The secret for period 3 should be derived and stored
oneOf(crypto).deriveNextSecret(secret0, 1); oneOf(crypto).deriveNextSecret(secret0, 1);
will(returnValue(secret1.clone())); will(returnValue(secret1.clone()));
@@ -778,7 +778,7 @@ public class KeyRotationIntegrationTest extends BriarTestCase {
MAX_LATENCY))); MAX_LATENCY)));
// The current time is the end of period 3 // The current time is the end of period 3
oneOf(clock).currentTimeMillis(); oneOf(clock).currentTimeMillis();
will(returnValue(EPOCH + 3 * ROTATION_PERIOD_LENGTH - 1)); will(returnValue(EPOCH + 3 * ROTATION_PERIOD - 1));
// The secrets for periods 3 and 4 should be derived from secret 1 // The secrets for periods 3 and 4 should be derived from secret 1
oneOf(crypto).deriveNextSecret(secret1, 2); oneOf(crypto).deriveNextSecret(secret1, 2);
will(returnValue(secret2.clone())); will(returnValue(secret2.clone()));