Retransmit messages based on maximum latency of transport.

This commit is contained in:
akwizgran
2013-02-06 15:11:55 +00:00
parent 5150737476
commit 9558bd88df
25 changed files with 134 additions and 76 deletions

View File

@@ -80,23 +80,25 @@ public interface DatabaseComponent {
/** /**
* Generates a batch of raw messages for the given contact, with a total * Generates a batch of raw messages for the given contact, with a total
* length less than or equal to the given length. Returns null if * length less than or equal to the given length, for transmission over a
* there are no sendable messages that fit in the given length. * transport with the given maximum latency. Returns null if there are no
* sendable messages that fit in the given length.
*/ */
Collection<byte[]> generateBatch(ContactId c, int maxLength) Collection<byte[]> generateBatch(ContactId c, int maxLength,
throws DbException; long maxLatency) throws DbException;
/** /**
* Generates a batch of raw messages for the given contact from the given * Generates a batch of raw messages for the given contact from the given
* collection of requested messages, with a total length less than or equal * collection of requested messages, with a total length less than or equal
* to the given length. Any messages that were either added to the batch, * to the given length, for transmission over a transport with the given
* or were considered but are no longer sendable to the contact, are * maximum latency. Any messages that were either added to the batch, or
* removed from the collection of requested messages before returning. * were considered but are no longer sendable to the contact, are removed
* Returns null if there are no sendable messages that fit in the given * from the collection of requested messages before returning. Returns null
* length. * if there are no sendable messages that fit in the given length.
*/ */
Collection<byte[]> generateBatch(ContactId c, int maxLength, Collection<byte[]> generateBatch(ContactId c, int maxLength,
Collection<MessageId> requested) throws DbException; long maxLatency, Collection<MessageId> requested)
throws DbException;
/** /**
* Generates an offer for the given contact. Returns null if there are no * Generates an offer for the given contact. Returns null if there are no

View File

@@ -11,6 +11,9 @@ import java.io.OutputStream;
*/ */
public interface DuplexTransportConnection { public interface DuplexTransportConnection {
/** Returns the maximum latency of the transport in milliseconds. */
long getMaxLatency();
/** Returns an input stream for reading from the connection. */ /** Returns an input stream for reading from the connection. */
InputStream getInputStream() throws IOException; InputStream getInputStream() throws IOException;

View File

@@ -12,6 +12,9 @@ public interface SimplexTransportWriter {
/** Returns the capacity of the transport in bytes. */ /** Returns the capacity of the transport in bytes. */
long getCapacity(); long getCapacity();
/** Returns the maximum latency of the transport in milliseconds. */
long getMaxLatency();
/** Returns an output stream for writing to the transport. */ /** Returns an output stream for writing to the transport. */
OutputStream getOutputStream() throws IOException; OutputStream getOutputStream() throws IOException;

View File

@@ -488,8 +488,8 @@ DatabaseCleaner.Callback {
return new Ack(acked); return new Ack(acked);
} }
public Collection<byte[]> generateBatch(ContactId c, int maxLength) public Collection<byte[]> generateBatch(ContactId c, int maxLength,
throws DbException { long maxLatency) throws DbException {
Collection<MessageId> ids; Collection<MessageId> ids;
List<byte[]> messages = new ArrayList<byte[]>(); List<byte[]> messages = new ArrayList<byte[]>();
// Get some sendable messages from the database // Get some sendable messages from the database
@@ -504,9 +504,8 @@ DatabaseCleaner.Callback {
if(!db.containsContact(txn, c)) if(!db.containsContact(txn, c))
throw new NoSuchContactException(); throw new NoSuchContactException();
ids = db.getSendableMessages(txn, c, maxLength); ids = db.getSendableMessages(txn, c, maxLength);
for(MessageId m : ids) { for(MessageId m : ids)
messages.add(db.getRawMessage(txn, m)); messages.add(db.getRawMessage(txn, m));
}
db.commitTransaction(txn); db.commitTransaction(txn);
} catch(DbException e) { } catch(DbException e) {
db.abortTransaction(txn); db.abortTransaction(txn);
@@ -519,13 +518,14 @@ DatabaseCleaner.Callback {
messageLock.readLock().unlock(); messageLock.readLock().unlock();
} }
if(messages.isEmpty()) return null; if(messages.isEmpty()) return null;
// Calculate the expiry time of the messages
long expiry = calculateExpiryTime(maxLatency);
// Record the messages as sent // Record the messages as sent
messageLock.writeLock().lock(); messageLock.writeLock().lock();
try { try {
T txn = db.startTransaction(); T txn = db.startTransaction();
try { try {
// FIXME: Calculate the expiry time db.addOutstandingMessages(txn, c, ids, expiry);
db.addOutstandingMessages(txn, c, ids, Long.MAX_VALUE);
db.commitTransaction(txn); db.commitTransaction(txn);
} catch(DbException e) { } catch(DbException e) {
db.abortTransaction(txn); db.abortTransaction(txn);
@@ -541,7 +541,8 @@ DatabaseCleaner.Callback {
} }
public Collection<byte[]> generateBatch(ContactId c, int maxLength, public Collection<byte[]> generateBatch(ContactId c, int maxLength,
Collection<MessageId> requested) throws DbException { long maxLatency, Collection<MessageId> requested)
throws DbException {
Collection<MessageId> ids = new ArrayList<MessageId>(); Collection<MessageId> ids = new ArrayList<MessageId>();
List<byte[]> messages = new ArrayList<byte[]>(); List<byte[]> messages = new ArrayList<byte[]>();
// Get some sendable messages from the database // Get some sendable messages from the database
@@ -579,13 +580,14 @@ DatabaseCleaner.Callback {
messageLock.readLock().unlock(); messageLock.readLock().unlock();
} }
if(messages.isEmpty()) return null; if(messages.isEmpty()) return null;
// Calculate the expiry times of the messages
long expiry = calculateExpiryTime(maxLatency);
// Record the messages as sent // Record the messages as sent
messageLock.writeLock().lock(); messageLock.writeLock().lock();
try { try {
T txn = db.startTransaction(); T txn = db.startTransaction();
try { try {
// FIXME: Calculate the expiry time db.addOutstandingMessages(txn, c, ids, expiry);
db.addOutstandingMessages(txn, c, ids, Long.MAX_VALUE);
db.commitTransaction(txn); db.commitTransaction(txn);
} catch(DbException e) { } catch(DbException e) {
db.abortTransaction(txn); db.abortTransaction(txn);
@@ -600,6 +602,14 @@ DatabaseCleaner.Callback {
return Collections.unmodifiableList(messages); return Collections.unmodifiableList(messages);
} }
private long calculateExpiryTime(long maxLatency) {
long roundTrip = maxLatency * 2;
if(roundTrip < 0) roundTrip = Long.MAX_VALUE; // Overflow
long expiry = clock.currentTimeMillis() + roundTrip;
if(expiry < 0) expiry = Long.MAX_VALUE; // Overflow
return expiry;
}
public Offer generateOffer(ContactId c, int maxMessages) public Offer generateOffer(ContactId c, int maxMessages)
throws DbException { throws DbException {
Collection<MessageId> offered; Collection<MessageId> offered;

View File

@@ -156,7 +156,6 @@ abstract class JdbcDatabase implements Database<Connection> {
+ " (messageId HASH NOT NULL," + " (messageId HASH NOT NULL,"
+ " contactId INT NOT NULL," + " contactId INT NOT NULL,"
+ " seen BOOLEAN NOT NULL," + " seen BOOLEAN NOT NULL,"
+ " transmissionCount INT NOT NULL,"
+ " expiry BIGINT NOT NULL," + " expiry BIGINT NOT NULL,"
+ " PRIMARY KEY (messageId, contactId)," + " PRIMARY KEY (messageId, contactId),"
+ " FOREIGN KEY (messageId)" + " FOREIGN KEY (messageId)"
@@ -651,16 +650,14 @@ abstract class JdbcDatabase implements Database<Connection> {
Collection<MessageId> sent, long expiry) throws DbException { Collection<MessageId> sent, long expiry) throws DbException {
PreparedStatement ps = null; PreparedStatement ps = null;
try { try {
// Update the transmission count and expiry time of each message // Update the expiry time of each message
String sql = "UPDATE statuses SET expiry = ?," String sql = "UPDATE statuses SET expiry = ?"
+ " transmissionCount = transmissionCount + ?"
+ " WHERE messageId = ? AND contactId = ?"; + " WHERE messageId = ? AND contactId = ?";
ps = txn.prepareStatement(sql); ps = txn.prepareStatement(sql);
ps.setLong(1, expiry); ps.setLong(1, expiry);
ps.setInt(2, 1); ps.setInt(3, c.getInt());
ps.setInt(4, c.getInt());
for(MessageId m : sent) { for(MessageId m : sent) {
ps.setBytes(3, m.getBytes()); ps.setBytes(2, m.getBytes());
ps.addBatch(); ps.addBatch();
} }
int[] batchAffected = ps.executeBatch(); int[] batchAffected = ps.executeBatch();
@@ -713,8 +710,8 @@ abstract class JdbcDatabase implements Database<Connection> {
PreparedStatement ps = null; PreparedStatement ps = null;
try { try {
String sql = "INSERT INTO statuses" String sql = "INSERT INTO statuses"
+ " (messageId, contactId, seen, transmissionCount, expiry)" + " (messageId, contactId, seen, expiry)"
+ " VALUES (?, ?, ?, ZERO(), ZERO())"; + " VALUES (?, ?, ?, ZERO())";
ps = txn.prepareStatement(sql); ps = txn.prepareStatement(sql);
ps.setBytes(1, m.getBytes()); ps.setBytes(1, m.getBytes());
ps.setInt(2, c.getInt()); ps.setInt(2, c.getInt());

View File

@@ -467,7 +467,8 @@ abstract class DuplexConnection implements DatabaseListener {
assert writer != null; assert writer != null;
try { try {
Collection<byte[]> batch = db.generateBatch(contactId, Collection<byte[]> batch = db.generateBatch(contactId,
Integer.MAX_VALUE, requested); Integer.MAX_VALUE, transport.getMaxLatency(),
requested);
if(batch == null) new GenerateOffer().run(); if(batch == null) new GenerateOffer().run();
else writerTasks.add(new WriteBatch(batch, requested)); else writerTasks.add(new WriteBatch(batch, requested));
} catch(DbException e) { } catch(DbException e) {

View File

@@ -69,6 +69,7 @@ class OutgoingSimplexConnection {
throw new EOFException(); throw new EOFException();
PacketWriter writer = packetWriterFactory.createPacketWriter(out, PacketWriter writer = packetWriterFactory.createPacketWriter(out,
transport.shouldFlush()); transport.shouldFlush());
long maxLatency = transport.getMaxLatency();
// Send the initial packets: updates and acks // Send the initial packets: updates and acks
boolean hasSpace = writeTransportAcks(conn, writer); boolean hasSpace = writeTransportAcks(conn, writer);
if(hasSpace) hasSpace = writeTransportUpdates(conn, writer); if(hasSpace) hasSpace = writeTransportUpdates(conn, writer);
@@ -89,12 +90,13 @@ class OutgoingSimplexConnection {
// Write messages until you can't write messages no more // Write messages until you can't write messages no more
capacity = conn.getRemainingCapacity(); capacity = conn.getRemainingCapacity();
int maxLength = (int) Math.min(capacity, MAX_PACKET_LENGTH); int maxLength = (int) Math.min(capacity, MAX_PACKET_LENGTH);
Collection<byte[]> batch = db.generateBatch(contactId, maxLength); Collection<byte[]> batch = db.generateBatch(contactId, maxLength,
maxLatency);
while(batch != null) { while(batch != null) {
for(byte[] raw : batch) writer.writeMessage(raw); for(byte[] raw : batch) writer.writeMessage(raw);
capacity = conn.getRemainingCapacity(); capacity = conn.getRemainingCapacity();
maxLength = (int) Math.min(capacity, MAX_PACKET_LENGTH); maxLength = (int) Math.min(capacity, MAX_PACKET_LENGTH);
batch = db.generateBatch(contactId, maxLength); batch = db.generateBatch(contactId, maxLength, maxLatency);
} }
writer.flush(); writer.flush();
writer.close(); writer.close();

View File

@@ -155,7 +155,7 @@ class BluetoothPlugin implements DuplexPlugin {
return; return;
} }
BluetoothTransportConnection conn = BluetoothTransportConnection conn =
new BluetoothTransportConnection(s); new BluetoothTransportConnection(s, maxLatency);
callback.incomingConnectionCreated(conn); callback.incomingConnectionCreated(conn);
if(!running) return; if(!running) return;
} }
@@ -202,7 +202,7 @@ class BluetoothPlugin implements DuplexPlugin {
private DuplexTransportConnection connect(String url) { private DuplexTransportConnection connect(String url) {
try { try {
StreamConnection s = (StreamConnection) Connector.open(url); StreamConnection s = (StreamConnection) Connector.open(url);
return new BluetoothTransportConnection(s); return new BluetoothTransportConnection(s, maxLatency);
} catch(IOException e) { } catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
return null; return null;
@@ -294,7 +294,7 @@ class BluetoothPlugin implements DuplexPlugin {
// Try to accept a connection and close the socket // Try to accept a connection and close the socket
try { try {
StreamConnection s = scn.acceptAndOpen(); StreamConnection s = scn.acceptAndOpen();
return new BluetoothTransportConnection(s); return new BluetoothTransportConnection(s, maxLatency);
} catch(IOException e) { } catch(IOException e) {
// This is expected when the socket is closed // This is expected when the socket is closed
if(LOG.isLoggable(INFO)) LOG.log(INFO, e.toString(), e); if(LOG.isLoggable(INFO)) LOG.log(INFO, e.toString(), e);

View File

@@ -11,9 +11,15 @@ import net.sf.briar.api.plugins.duplex.DuplexTransportConnection;
class BluetoothTransportConnection implements DuplexTransportConnection { class BluetoothTransportConnection implements DuplexTransportConnection {
private final StreamConnection stream; private final StreamConnection stream;
private final long maxLatency;
BluetoothTransportConnection(StreamConnection stream) { BluetoothTransportConnection(StreamConnection stream, long maxLatency) {
this.stream = stream; this.stream = stream;
this.maxLatency = maxLatency;
}
public long getMaxLatency() {
return maxLatency;
} }
public InputStream getInputStream() throws IOException { public InputStream getInputStream() throws IOException {

View File

@@ -188,7 +188,7 @@ class DroidtoothPlugin implements DuplexPlugin {
return; return;
} }
DroidtoothTransportConnection conn = DroidtoothTransportConnection conn =
new DroidtoothTransportConnection(s); new DroidtoothTransportConnection(s, maxLatency);
callback.incomingConnectionCreated(conn); callback.incomingConnectionCreated(conn);
if(!running) return; if(!running) return;
} }
@@ -250,7 +250,7 @@ class DroidtoothPlugin implements DuplexPlugin {
try { try {
BluetoothSocket s = InsecureBluetooth.createSocket(d, u); BluetoothSocket s = InsecureBluetooth.createSocket(d, u);
s.connect(); s.connect();
return new DroidtoothTransportConnection(s); return new DroidtoothTransportConnection(s, maxLatency);
} catch(IOException e) { } catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
return null; return null;
@@ -310,7 +310,8 @@ class DroidtoothPlugin implements DuplexPlugin {
} }
// Return the first connection received by the socket, if any // Return the first connection received by the socket, if any
try { try {
return new DroidtoothTransportConnection(ss.accept((int) timeout)); BluetoothSocket s = ss.accept((int) timeout);
return new DroidtoothTransportConnection(s, maxLatency);
} catch(SocketTimeoutException e) { } catch(SocketTimeoutException e) {
if(LOG.isLoggable(INFO)) LOG.info("Invitation timed out"); if(LOG.isLoggable(INFO)) LOG.info("Invitation timed out");
return null; return null;

View File

@@ -10,9 +10,15 @@ import android.bluetooth.BluetoothSocket;
class DroidtoothTransportConnection implements DuplexTransportConnection { class DroidtoothTransportConnection implements DuplexTransportConnection {
private final BluetoothSocket socket; private final BluetoothSocket socket;
private final long maxLatency;
DroidtoothTransportConnection(BluetoothSocket socket) { DroidtoothTransportConnection(BluetoothSocket socket, long maxLatency) {
this.socket = socket; this.socket = socket;
this.maxLatency = maxLatency;
}
public long getMaxLatency() {
return maxLatency;
} }
public InputStream getInputStream() throws IOException { public InputStream getInputStream() throws IOException {

View File

@@ -28,6 +28,7 @@ public abstract class FilePlugin implements SimplexPlugin {
protected final Executor pluginExecutor; protected final Executor pluginExecutor;
protected final SimplexPluginCallback callback; protected final SimplexPluginCallback callback;
protected final long maxLatency;
protected volatile boolean running = false; protected volatile boolean running = false;
@@ -37,9 +38,10 @@ public abstract class FilePlugin implements SimplexPlugin {
protected abstract void readerFinished(File f); protected abstract void readerFinished(File f);
protected FilePlugin(@PluginExecutor Executor pluginExecutor, protected FilePlugin(@PluginExecutor Executor pluginExecutor,
SimplexPluginCallback callback) { SimplexPluginCallback callback, long maxLatency) {
this.pluginExecutor = pluginExecutor; this.pluginExecutor = pluginExecutor;
this.callback = callback; this.callback = callback;
this.maxLatency = maxLatency;
} }
public SimplexTransportReader createReader(ContactId c) { public SimplexTransportReader createReader(ContactId c) {
@@ -72,7 +74,7 @@ public abstract class FilePlugin implements SimplexPlugin {
long capacity = getCapacity(dir.getPath()); long capacity = getCapacity(dir.getPath());
if(capacity < MIN_CONNECTION_LENGTH) return null; if(capacity < MIN_CONNECTION_LENGTH) return null;
OutputStream out = new FileOutputStream(f); OutputStream out = new FileOutputStream(f);
return new FileTransportWriter(f, out, capacity, this); return new FileTransportWriter(f, out, capacity, maxLatency, this);
} catch(IOException e) { } catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
f.delete(); f.delete();

View File

@@ -16,14 +16,15 @@ class FileTransportWriter implements SimplexTransportWriter {
private final File file; private final File file;
private final OutputStream out; private final OutputStream out;
private final long capacity; private final long capacity, maxLatency;
private final FilePlugin plugin; private final FilePlugin plugin;
FileTransportWriter(File file, OutputStream out, long capacity, FileTransportWriter(File file, OutputStream out, long capacity,
FilePlugin plugin) { long maxLatency, FilePlugin plugin) {
this.file = file; this.file = file;
this.out = out; this.out = out;
this.capacity = capacity; this.capacity = capacity;
this.maxLatency = maxLatency;
this.plugin = plugin; this.plugin = plugin;
} }
@@ -31,6 +32,10 @@ class FileTransportWriter implements SimplexTransportWriter {
return capacity; return capacity;
} }
public long getMaxLatency() {
return maxLatency;
}
public OutputStream getOutputStream() { public OutputStream getOutputStream() {
return out; return out;
} }

View File

@@ -31,15 +31,13 @@ implements RemovableDriveMonitor.Callback {
private final RemovableDriveFinder finder; private final RemovableDriveFinder finder;
private final RemovableDriveMonitor monitor; private final RemovableDriveMonitor monitor;
private final long maxLatency;
RemovableDrivePlugin(@PluginExecutor Executor pluginExecutor, RemovableDrivePlugin(@PluginExecutor Executor pluginExecutor,
SimplexPluginCallback callback, RemovableDriveFinder finder, SimplexPluginCallback callback, RemovableDriveFinder finder,
RemovableDriveMonitor monitor, long maxLatency) { RemovableDriveMonitor monitor, long maxLatency) {
super(pluginExecutor, callback); super(pluginExecutor, callback, maxLatency);
this.finder = finder; this.finder = finder;
this.monitor = monitor; this.monitor = monitor;
this.maxLatency = maxLatency;
} }
public TransportId getId() { public TransportId getId() {

View File

@@ -234,6 +234,10 @@ class ModemPlugin implements DuplexPlugin, Modem.Callback {
private final CountDownLatch finished = new CountDownLatch(1); private final CountDownLatch finished = new CountDownLatch(1);
public long getMaxLatency() {
return maxLatency;
}
public InputStream getInputStream() throws IOException { public InputStream getInputStream() throws IOException {
return modem.getInputStream(); return modem.getInputStream();
} }

View File

@@ -152,7 +152,7 @@ class LanTcpPlugin extends TcpPlugin {
// Connect back on the advertised TCP port // Connect back on the advertised TCP port
Socket s = new Socket(packet.getAddress(), port); Socket s = new Socket(packet.getAddress(), port);
s.setSoTimeout(0); s.setSoTimeout(0);
return new TcpTransportConnection(s); return new TcpTransportConnection(s, maxLatency);
} catch(IOException e) { } catch(IOException e) {
if(LOG.isLoggable(WARNING)) if(LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e); LOG.log(WARNING, e.toString(), e);
@@ -290,7 +290,7 @@ class LanTcpPlugin extends TcpPlugin {
ss.setSoTimeout(wait < 1 ? 1 : wait); ss.setSoTimeout(wait < 1 ? 1 : wait);
Socket s = ss.accept(); Socket s = ss.accept();
s.setSoTimeout(0); s.setSoTimeout(0);
return new TcpTransportConnection(s); return new TcpTransportConnection(s, maxLatency);
} catch(SocketTimeoutException e) { } catch(SocketTimeoutException e) {
now = clock.currentTimeMillis(); now = clock.currentTimeMillis();
if(now < end) { if(now < end) {

View File

@@ -131,8 +131,8 @@ abstract class TcpPlugin implements DuplexPlugin {
tryToClose(ss); tryToClose(ss);
return; return;
} }
TcpTransportConnection conn = new TcpTransportConnection(s); callback.incomingConnectionCreated(new TcpTransportConnection(s,
callback.incomingConnectionCreated(conn); maxLatency));
if(!running) return; if(!running) return;
} }
} }
@@ -177,7 +177,7 @@ abstract class TcpPlugin implements DuplexPlugin {
try { try {
s.setSoTimeout(0); s.setSoTimeout(0);
s.connect(addr); s.connect(addr);
return new TcpTransportConnection(s); return new TcpTransportConnection(s, maxLatency);
} catch(IOException e) { } catch(IOException e) {
if(LOG.isLoggable(INFO)) LOG.log(INFO, e.toString(), e); if(LOG.isLoggable(INFO)) LOG.log(INFO, e.toString(), e);
return null; return null;

View File

@@ -10,9 +10,15 @@ import net.sf.briar.api.plugins.duplex.DuplexTransportConnection;
class TcpTransportConnection implements DuplexTransportConnection { class TcpTransportConnection implements DuplexTransportConnection {
private final Socket socket; private final Socket socket;
private final long maxLatency;
TcpTransportConnection(Socket socket) { TcpTransportConnection(Socket socket, long maxLatency) {
this.socket = socket; this.socket = socket;
this.maxLatency = maxLatency;
}
public long getMaxLatency() {
return maxLatency;
} }
public InputStream getInputStream() throws IOException { public InputStream getInputStream() throws IOException {

View File

@@ -195,8 +195,8 @@ class TorPlugin implements DuplexPlugin {
tryToClose(ss); tryToClose(ss);
return; return;
} }
TorTransportConnection conn = new TorTransportConnection(s); callback.incomingConnectionCreated(new TorTransportConnection(s,
callback.incomingConnectionCreated(conn); maxLatency));
synchronized(this) { synchronized(this) {
if(!running) return; if(!running) return;
} }
@@ -277,7 +277,7 @@ class TorPlugin implements DuplexPlugin {
if(LOG.isLoggable(INFO)) LOG.info("Connecting to hidden service"); if(LOG.isLoggable(INFO)) LOG.info("Connecting to hidden service");
NetSocket s = nl.createNetSocket(null, null, addr); NetSocket s = nl.createNetSocket(null, null, addr);
if(LOG.isLoggable(INFO)) LOG.info("Connected to hidden service"); if(LOG.isLoggable(INFO)) LOG.info("Connected to hidden service");
return new TorTransportConnection(s); return new TorTransportConnection(s, maxLatency);
} catch(IOException e) { } catch(IOException e) {
if(LOG.isLoggable(INFO)) LOG.log(INFO, e.toString(), e); if(LOG.isLoggable(INFO)) LOG.log(INFO, e.toString(), e);
return null; return null;

View File

@@ -11,9 +11,15 @@ import org.silvertunnel.netlib.api.NetSocket;
class TorTransportConnection implements DuplexTransportConnection { class TorTransportConnection implements DuplexTransportConnection {
private final NetSocket socket; private final NetSocket socket;
private final long maxLatency;
TorTransportConnection(NetSocket socket) { TorTransportConnection(NetSocket socket, long maxLatency) {
this.socket = socket; this.socket = socket;
this.maxLatency = maxLatency;
}
public long getMaxLatency() {
return maxLatency;
} }
public InputStream getInputStream() throws IOException { public InputStream getInputStream() throws IOException {

View File

@@ -503,12 +503,12 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
} catch(NoSuchContactException expected) {} } catch(NoSuchContactException expected) {}
try { try {
db.generateBatch(contactId, 123); db.generateBatch(contactId, 123, 456);
fail(); fail();
} catch(NoSuchContactException expected) {} } catch(NoSuchContactException expected) {}
try { try {
db.generateBatch(contactId, 123, Arrays.asList(messageId)); db.generateBatch(contactId, 123, 456, Arrays.asList(messageId));
fail(); fail();
} catch(NoSuchContactException expected) {} } catch(NoSuchContactException expected) {}
@@ -696,14 +696,14 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
oneOf(database).getRawMessage(txn, messageId1); oneOf(database).getRawMessage(txn, messageId1);
will(returnValue(raw1)); will(returnValue(raw1));
// Record the outstanding messages // Record the outstanding messages
// FIXME: Calculate the expiry time
oneOf(database).addOutstandingMessages(txn, contactId, sendable, oneOf(database).addOutstandingMessages(txn, contactId, sendable,
Long.MAX_VALUE); Long.MAX_VALUE);
}}); }});
DatabaseComponent db = createDatabaseComponent(database, cleaner, DatabaseComponent db = createDatabaseComponent(database, cleaner,
shutdown); shutdown);
assertEquals(messages, db.generateBatch(contactId, size * 2)); assertEquals(messages, db.generateBatch(contactId, size * 2,
Long.MAX_VALUE));
context.assertIsSatisfied(); context.assertIsSatisfied();
} }
@@ -733,16 +733,15 @@ public abstract class DatabaseComponentTest extends BriarTestCase {
will(returnValue(raw1)); // Message is sendable will(returnValue(raw1)); // Message is sendable
oneOf(database).getRawMessageIfSendable(txn, contactId, messageId2); oneOf(database).getRawMessageIfSendable(txn, contactId, messageId2);
will(returnValue(null)); // Message is not sendable will(returnValue(null)); // Message is not sendable
// Record the outstanding messages // Record the outstanding message
// FIXME: Calculate the expiry time
oneOf(database).addOutstandingMessages(txn, contactId, oneOf(database).addOutstandingMessages(txn, contactId,
Collections.singletonList(messageId1), Long.MAX_VALUE); Arrays.asList(messageId1), Long.MAX_VALUE);
}}); }});
DatabaseComponent db = createDatabaseComponent(database, cleaner, DatabaseComponent db = createDatabaseComponent(database, cleaner,
shutdown); shutdown);
assertEquals(messages, db.generateBatch(contactId, size * 3, assertEquals(messages, db.generateBatch(contactId, size * 3,
requested)); Long.MAX_VALUE, requested));
context.assertIsSatisfied(); context.assertIsSatisfied();
} }

View File

@@ -524,9 +524,8 @@ public class H2DatabaseTest extends BriarTestCase {
assertTrue(it.hasNext()); assertTrue(it.hasNext());
assertEquals(messageId, it.next()); assertEquals(messageId, it.next());
assertFalse(it.hasNext()); assertFalse(it.hasNext());
// FIXME: Calculate the expiry time db.addOutstandingMessages(txn, contactId, Arrays.asList(messageId),
db.addOutstandingMessages(txn, contactId, Long.MAX_VALUE);
Collections.singletonList(messageId), Long.MAX_VALUE);
// The message should no longer be sendable // The message should no longer be sendable
it = db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator(); it = db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator();

View File

@@ -87,7 +87,7 @@ public class OutgoingSimplexConnectionTest extends BriarTestCase {
public void testConnectionTooShort() throws Exception { public void testConnectionTooShort() throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
TestSimplexTransportWriter transport = new TestSimplexTransportWriter( TestSimplexTransportWriter transport = new TestSimplexTransportWriter(
out, MAX_PACKET_LENGTH, true); out, MAX_PACKET_LENGTH, Long.MAX_VALUE, true);
ConnectionContext ctx = new ConnectionContext(contactId, transportId, ConnectionContext ctx = new ConnectionContext(contactId, transportId,
secret, 0, true); secret, 0, true);
OutgoingSimplexConnection connection = new OutgoingSimplexConnection(db, OutgoingSimplexConnection connection = new OutgoingSimplexConnection(db,
@@ -105,7 +105,7 @@ public class OutgoingSimplexConnectionTest extends BriarTestCase {
public void testNothingToSend() throws Exception { public void testNothingToSend() throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
TestSimplexTransportWriter transport = new TestSimplexTransportWriter( TestSimplexTransportWriter transport = new TestSimplexTransportWriter(
out, MIN_CONNECTION_LENGTH, true); out, MIN_CONNECTION_LENGTH, Long.MAX_VALUE, true);
ConnectionContext ctx = new ConnectionContext(contactId, transportId, ConnectionContext ctx = new ConnectionContext(contactId, transportId,
secret, 0, true); secret, 0, true);
OutgoingSimplexConnection connection = new OutgoingSimplexConnection(db, OutgoingSimplexConnection connection = new OutgoingSimplexConnection(db,
@@ -134,7 +134,8 @@ public class OutgoingSimplexConnectionTest extends BriarTestCase {
oneOf(db).generateAck(with(contactId), with(any(int.class))); oneOf(db).generateAck(with(contactId), with(any(int.class)));
will(returnValue(null)); will(returnValue(null));
// No messages to send // No messages to send
oneOf(db).generateBatch(with(contactId), with(any(int.class))); oneOf(db).generateBatch(with(contactId), with(any(int.class)),
with(any(long.class)));
will(returnValue(null)); will(returnValue(null));
}}); }});
connection.write(); connection.write();
@@ -150,7 +151,7 @@ public class OutgoingSimplexConnectionTest extends BriarTestCase {
public void testSomethingToSend() throws Exception { public void testSomethingToSend() throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
TestSimplexTransportWriter transport = new TestSimplexTransportWriter( TestSimplexTransportWriter transport = new TestSimplexTransportWriter(
out, MIN_CONNECTION_LENGTH, true); out, MIN_CONNECTION_LENGTH, Long.MAX_VALUE, true);
ConnectionContext ctx = new ConnectionContext(contactId, transportId, ConnectionContext ctx = new ConnectionContext(contactId, transportId,
secret, 0, true); secret, 0, true);
OutgoingSimplexConnection connection = new OutgoingSimplexConnection(db, OutgoingSimplexConnection connection = new OutgoingSimplexConnection(db,
@@ -183,10 +184,12 @@ public class OutgoingSimplexConnectionTest extends BriarTestCase {
oneOf(db).generateAck(with(contactId), with(any(int.class))); oneOf(db).generateAck(with(contactId), with(any(int.class)));
will(returnValue(null)); will(returnValue(null));
// One message to send // One message to send
oneOf(db).generateBatch(with(contactId), with(any(int.class))); oneOf(db).generateBatch(with(contactId), with(any(int.class)),
with(any(long.class)));
will(returnValue(Collections.singletonList(raw))); will(returnValue(Collections.singletonList(raw)));
// No more messages // No more messages
oneOf(db).generateBatch(with(contactId), with(any(int.class))); oneOf(db).generateBatch(with(contactId), with(any(int.class)),
with(any(long.class)));
will(returnValue(null)); will(returnValue(null));
}}); }});
connection.write(); connection.write();

View File

@@ -126,7 +126,7 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
PacketWriterFactory packetWriterFactory = PacketWriterFactory packetWriterFactory =
alice.getInstance(PacketWriterFactory.class); alice.getInstance(PacketWriterFactory.class);
TestSimplexTransportWriter transport = new TestSimplexTransportWriter( TestSimplexTransportWriter transport = new TestSimplexTransportWriter(
out, Long.MAX_VALUE, false); out, Long.MAX_VALUE, Long.MAX_VALUE, false);
ConnectionContext ctx = km.getConnectionContext(contactId, transportId); ConnectionContext ctx = km.getConnectionContext(contactId, transportId);
assertNotNull(ctx); assertNotNull(ctx);
OutgoingSimplexConnection simplex = new OutgoingSimplexConnection(db, OutgoingSimplexConnection simplex = new OutgoingSimplexConnection(db,

View File

@@ -8,15 +8,16 @@ import net.sf.briar.api.plugins.simplex.SimplexTransportWriter;
class TestSimplexTransportWriter implements SimplexTransportWriter { class TestSimplexTransportWriter implements SimplexTransportWriter {
private final ByteArrayOutputStream out; private final ByteArrayOutputStream out;
private final long capacity; private final long capacity, maxLatency;
private final boolean flush; private final boolean flush;
private boolean disposed = false, exception = false; private boolean disposed = false, exception = false;
TestSimplexTransportWriter(ByteArrayOutputStream out, long capacity, TestSimplexTransportWriter(ByteArrayOutputStream out, long capacity,
boolean flush) { long maxLatency, boolean flush) {
this.out = out; this.out = out;
this.capacity = capacity; this.capacity = capacity;
this.maxLatency = maxLatency;
this.flush = flush; this.flush = flush;
} }
@@ -24,6 +25,10 @@ class TestSimplexTransportWriter implements SimplexTransportWriter {
return capacity; return capacity;
} }
public long getMaxLatency() {
return maxLatency;
}
public OutputStream getOutputStream() { public OutputStream getOutputStream() {
return out; return out;
} }