Use the transport's idle timeout, not a hardcoded value.

This commit is contained in:
akwizgran
2014-12-14 15:18:39 +00:00
parent d4fa656dbb
commit 29a6596ee3
15 changed files with 77 additions and 26 deletions

View File

@@ -68,6 +68,10 @@ class DroidtoothTransportConnection implements DuplexTransportConnection {
return plugin.getMaxLatency(); return plugin.getMaxLatency();
} }
public long getMaxIdleTime() {
return plugin.getMaxIdleTime();
}
public long getCapacity() { public long getCapacity() {
return Long.MAX_VALUE; return Long.MAX_VALUE;
} }

View File

@@ -67,6 +67,10 @@ class TorTransportConnection implements DuplexTransportConnection {
return plugin.getMaxLatency(); return plugin.getMaxLatency();
} }
public long getMaxIdleTime() {
return plugin.getMaxIdleTime();
}
public long getCapacity() { public long getCapacity() {
return Long.MAX_VALUE; return Long.MAX_VALUE;
} }

View File

@@ -11,6 +11,9 @@ public interface MessagingSessionFactory {
MessagingSession createIncomingSession(ContactId c, TransportId t, MessagingSession createIncomingSession(ContactId c, TransportId t,
InputStream in); InputStream in);
MessagingSession createOutgoingSession(ContactId c, TransportId t, MessagingSession createSimplexOutgoingSession(ContactId c, TransportId t,
long maxLatency, boolean duplex, OutputStream out); long maxLatency, OutputStream out);
MessagingSession createDuplexOutgoingSession(ContactId c, TransportId t,
long maxLatency, long maxIdleTime, OutputStream out);
} }

View File

@@ -17,6 +17,9 @@ public interface Plugin {
/** Returns the transport's maximum latency in milliseconds. */ /** Returns the transport's maximum latency in milliseconds. */
long getMaxLatency(); long getMaxLatency();
/** Returns the transport's maximum idle time in milliseconds. */
long getMaxIdleTime();
/** Starts the plugin and returns true if it started successfully. */ /** Starts the plugin and returns true if it started successfully. */
boolean start() throws IOException; boolean start() throws IOException;

View File

@@ -15,6 +15,9 @@ public interface TransportConnectionWriter {
/** Returns the maximum latency of the transport in milliseconds. */ /** Returns the maximum latency of the transport in milliseconds. */
long getMaxLatency(); long getMaxLatency();
/** Returns the maximum idle time of the transport in milliseconds. */
long getMaxIdleTime();
/** Returns the capacity of the transport connection in bytes. */ /** Returns the capacity of the transport connection in bytes. */
long getCapacity(); long getCapacity();

View File

@@ -7,9 +7,6 @@ import org.briarproject.api.plugins.Plugin;
/** An interface for transport plugins that support duplex communication. */ /** An interface for transport plugins that support duplex communication. */
public interface DuplexPlugin extends Plugin { public interface DuplexPlugin extends Plugin {
/** Returns the transport's maximum idle time in milliseconds. */
long getMaxIdleTime();
/** /**
* Attempts to create and return a connection to the given contact using * Attempts to create and return a connection to the given contact using
* the current transport and configuration properties. Returns null if a * the current transport and configuration properties. Returns null if a

View File

@@ -55,7 +55,6 @@ import org.briarproject.api.messaging.TransportUpdate;
*/ */
class DuplexOutgoingSession implements MessagingSession, EventListener { class DuplexOutgoingSession implements MessagingSession, EventListener {
private static final int MAX_IDLE_TIME = 30 * 1000; // Milliseconds
private static final Logger LOG = private static final Logger LOG =
Logger.getLogger(DuplexOutgoingSession.class.getName()); Logger.getLogger(DuplexOutgoingSession.class.getName());
@@ -69,7 +68,7 @@ class DuplexOutgoingSession implements MessagingSession, EventListener {
private final EventBus eventBus; private final EventBus eventBus;
private final ContactId contactId; private final ContactId contactId;
private final TransportId transportId; private final TransportId transportId;
private final long maxLatency; private final long maxLatency, maxIdleTime;
private final OutputStream out; private final OutputStream out;
private final PacketWriter packetWriter; private final PacketWriter packetWriter;
private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks; private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks;
@@ -79,13 +78,14 @@ class DuplexOutgoingSession implements MessagingSession, EventListener {
DuplexOutgoingSession(DatabaseComponent db, Executor dbExecutor, DuplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
EventBus eventBus, PacketWriterFactory packetWriterFactory, EventBus eventBus, PacketWriterFactory packetWriterFactory,
ContactId contactId, TransportId transportId, long maxLatency, ContactId contactId, TransportId transportId, long maxLatency,
OutputStream out) { long maxIdleTime, OutputStream out) {
this.db = db; this.db = db;
this.dbExecutor = dbExecutor; this.dbExecutor = dbExecutor;
this.eventBus = eventBus; this.eventBus = eventBus;
this.contactId = contactId; this.contactId = contactId;
this.transportId = transportId; this.transportId = transportId;
this.maxLatency = maxLatency; this.maxLatency = maxLatency;
this.maxIdleTime = maxIdleTime;
this.out = out; this.out = out;
packetWriter = packetWriterFactory.createPacketWriter(out); packetWriter = packetWriterFactory.createPacketWriter(out);
writerTasks = new LinkedBlockingQueue<ThrowingRunnable<IOException>>(); writerTasks = new LinkedBlockingQueue<ThrowingRunnable<IOException>>();
@@ -110,8 +110,8 @@ class DuplexOutgoingSession implements MessagingSession, EventListener {
while(!interrupted) { while(!interrupted) {
// Flush the stream if it's going to be idle // Flush the stream if it's going to be idle
if(writerTasks.isEmpty()) out.flush(); if(writerTasks.isEmpty()) out.flush();
ThrowingRunnable<IOException> task = writerTasks.poll( ThrowingRunnable<IOException> task =
MAX_IDLE_TIME, MILLISECONDS); writerTasks.poll(maxIdleTime, MILLISECONDS);
if(task == null) { if(task == null) {
LOG.info("Idle timeout"); LOG.info("Idle timeout");
continue; // Flush and wait again continue; // Flush and wait again

View File

@@ -49,11 +49,16 @@ class MessagingSessionFactoryImpl implements MessagingSessionFactory {
messageVerifier, packetReaderFactory, c, t, in); messageVerifier, packetReaderFactory, c, t, in);
} }
public MessagingSession createOutgoingSession(ContactId c, TransportId t, public MessagingSession createSimplexOutgoingSession(ContactId c,
long maxLatency, boolean duplex, OutputStream out) { TransportId t, long maxLatency, OutputStream out) {
if(duplex) return new DuplexOutgoingSession(db, dbExecutor, eventBus, return new SimplexOutgoingSession(db, dbExecutor, eventBus,
packetWriterFactory, c, t, maxLatency, out);
else return new SimplexOutgoingSession(db, dbExecutor, eventBus,
packetWriterFactory, c, t, maxLatency, out); packetWriterFactory, c, t, maxLatency, out);
} }
public MessagingSession createDuplexOutgoingSession(ContactId c,
TransportId t, long maxLatency, long maxIdleTime,
OutputStream out) {
return new DuplexOutgoingSession(db, dbExecutor, eventBus,
packetWriterFactory, c, t, maxLatency, maxIdleTime, out);
}
} }

View File

@@ -107,14 +107,27 @@ class ConnectionManagerImpl implements ConnectionManager {
} }
} }
private MessagingSession createOutgoingSession(StreamContext ctx, private MessagingSession createSimplexOutgoingSession(StreamContext ctx,
TransportConnectionWriter w, boolean duplex) throws IOException { TransportConnectionWriter w) throws IOException {
try { try {
StreamWriter streamWriter = streamWriterFactory.createStreamWriter( StreamWriter streamWriter = streamWriterFactory.createStreamWriter(
w.getOutputStream(), w.getMaxFrameLength(), ctx); w.getOutputStream(), w.getMaxFrameLength(), ctx);
return messagingSessionFactory.createOutgoingSession( return messagingSessionFactory.createSimplexOutgoingSession(
ctx.getContactId(), ctx.getTransportId(), w.getMaxLatency(), ctx.getContactId(), ctx.getTransportId(), w.getMaxLatency(),
duplex, streamWriter.getOutputStream()); streamWriter.getOutputStream());
} finally {
ByteUtils.erase(ctx.getSecret());
}
}
private MessagingSession createDuplexOutgoingSession(StreamContext ctx,
TransportConnectionWriter w) throws IOException {
try {
StreamWriter streamWriter = streamWriterFactory.createStreamWriter(
w.getOutputStream(), w.getMaxFrameLength(), ctx);
return messagingSessionFactory.createDuplexOutgoingSession(
ctx.getContactId(), ctx.getTransportId(), w.getMaxLatency(),
w.getMaxIdleTime(), streamWriter.getOutputStream());
} finally { } finally {
ByteUtils.erase(ctx.getSecret()); ByteUtils.erase(ctx.getSecret());
} }
@@ -199,7 +212,7 @@ class ConnectionManagerImpl implements ConnectionManager {
connectionRegistry.registerConnection(contactId, transportId); connectionRegistry.registerConnection(contactId, transportId);
try { try {
// Create and run the outgoing session // Create and run the outgoing session
createOutgoingSession(ctx, writer, false).run(); createSimplexOutgoingSession(ctx, writer).run();
disposeWriter(false); disposeWriter(false);
} catch(IOException e) { } catch(IOException e) {
if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
@@ -287,7 +300,7 @@ class ConnectionManagerImpl implements ConnectionManager {
} }
try { try {
// Create and run the outgoing session // Create and run the outgoing session
outgoingSession = createOutgoingSession(ctx, writer, true); outgoingSession = createDuplexOutgoingSession(ctx, writer);
outgoingSession.run(); outgoingSession.run();
disposeWriter(false); disposeWriter(false);
} catch(IOException e) { } catch(IOException e) {
@@ -353,7 +366,7 @@ class ConnectionManagerImpl implements ConnectionManager {
}); });
try { try {
// Create and run the outgoing session // Create and run the outgoing session
outgoingSession = createOutgoingSession(ctx, writer, true); outgoingSession = createDuplexOutgoingSession(ctx, writer);
outgoingSession.run(); outgoingSession.run();
disposeWriter(false); disposeWriter(false);
} catch(IOException e) { } catch(IOException e) {

View File

@@ -56,6 +56,10 @@ public abstract class FilePlugin implements SimplexPlugin {
return maxLatency; return maxLatency;
} }
public long getMaxIdleTime() {
return Long.MAX_VALUE; // We don't need keepalives
}
public boolean isRunning() { public boolean isRunning() {
return running; return running;
} }

View File

@@ -35,6 +35,10 @@ class FileTransportWriter implements TransportConnectionWriter {
return plugin.getMaxLatency(); return plugin.getMaxLatency();
} }
public long getMaxIdleTime() {
return plugin.getMaxIdleTime();
}
public long getCapacity() { public long getCapacity() {
return capacity; return capacity;
} }

View File

@@ -67,6 +67,10 @@ class TcpTransportConnection implements DuplexTransportConnection {
return plugin.getMaxLatency(); return plugin.getMaxLatency();
} }
public long getMaxIdleTime() {
return plugin.getMaxIdleTime();
}
public long getCapacity() { public long getCapacity() {
return Long.MAX_VALUE; return Long.MAX_VALUE;
} }

View File

@@ -68,6 +68,10 @@ class BluetoothTransportConnection implements DuplexTransportConnection {
return plugin.getMaxLatency(); return plugin.getMaxLatency();
} }
public long getMaxIdleTime() {
return plugin.getMaxIdleTime();
}
public long getCapacity() { public long getCapacity() {
return Long.MAX_VALUE; return Long.MAX_VALUE;
} }

View File

@@ -29,8 +29,7 @@ implements RemovableDriveMonitor.Callback {
RemovableDrivePlugin(Executor ioExecutor, FileUtils fileUtils, RemovableDrivePlugin(Executor ioExecutor, FileUtils fileUtils,
SimplexPluginCallback callback, RemovableDriveFinder finder, SimplexPluginCallback callback, RemovableDriveFinder finder,
RemovableDriveMonitor monitor, int maxFrameLength, RemovableDriveMonitor monitor, int maxFrameLength, long maxLatency) {
long maxLatency) {
super(ioExecutor, fileUtils, callback, maxFrameLength, maxLatency); super(ioExecutor, fileUtils, callback, maxFrameLength, maxLatency);
this.finder = finder; this.finder = finder;
this.monitor = monitor; this.monitor = monitor;

View File

@@ -223,11 +223,15 @@ class ModemPlugin implements DuplexPlugin, Modem.Callback {
private class Writer implements TransportConnectionWriter { private class Writer implements TransportConnectionWriter {
public int getMaxFrameLength() { public int getMaxFrameLength() {
return maxFrameLength; return getMaxFrameLength();
} }
public long getMaxLatency() { public long getMaxLatency() {
return maxLatency; return getMaxLatency();
}
public long getMaxIdleTime() {
return getMaxIdleTime();
} }
public long getCapacity() { public long getCapacity() {