Application layer keepalives to detect dead TCP connections.

DuplexOutgoingSession flushes its output stream if it's idle for a
transport-defined interval, causing an empty frame to be sent. The TCP
and Tor plugins use a socket timeout equal to twice the idle interval to
detect dead connections.

See bugs #27, #46 and #60.
This commit is contained in:
akwizgran
2014-12-13 12:00:40 +00:00
parent 3a70aa7653
commit d4fa656dbb
19 changed files with 95 additions and 26 deletions

View File

@@ -107,6 +107,11 @@ class DroidtoothPlugin implements DuplexPlugin {
return maxLatency; return maxLatency;
} }
public long getMaxIdleTime() {
// Bluetooth detects dead connections so we don't need keepalives
return Long.MAX_VALUE;
}
public boolean start() throws IOException { public boolean start() throws IOException {
// BluetoothAdapter.getDefaultAdapter() must be called on a thread // BluetoothAdapter.getDefaultAdapter() must be called on a thread
// with a message queue, so submit it to the AndroidExecutor // with a message queue, so submit it to the AndroidExecutor
@@ -361,6 +366,7 @@ class DroidtoothPlugin implements DuplexPlugin {
private class BluetoothStateReceiver extends BroadcastReceiver { private class BluetoothStateReceiver extends BroadcastReceiver {
@Override
public void onReceive(Context ctx, Intent intent) { public void onReceive(Context ctx, Intent intent) {
int state = intent.getIntExtra(EXTRA_STATE, 0); int state = intent.getIntExtra(EXTRA_STATE, 0);
if(state == STATE_ON) { if(state == STATE_ON) {

View File

@@ -27,8 +27,8 @@ class AndroidLanTcpPlugin extends LanTcpPlugin {
AndroidLanTcpPlugin(Executor ioExecutor, Context appContext, AndroidLanTcpPlugin(Executor ioExecutor, Context appContext,
DuplexPluginCallback callback, int maxFrameLength, long maxLatency, DuplexPluginCallback callback, int maxFrameLength, long maxLatency,
long pollingInterval) { long maxIdleTime, long pollingInterval) {
super(ioExecutor, callback, maxFrameLength, maxLatency, super(ioExecutor, callback, maxFrameLength, maxLatency, maxIdleTime,
pollingInterval); pollingInterval);
this.appContext = appContext; this.appContext = appContext;
} }

View File

@@ -13,6 +13,7 @@ public class AndroidLanTcpPluginFactory implements DuplexPluginFactory {
private static final int MAX_FRAME_LENGTH = 1024; private static final int MAX_FRAME_LENGTH = 1024;
private static final long MAX_LATENCY = 60 * 1000; // 1 minute private static final long MAX_LATENCY = 60 * 1000; // 1 minute
private static final long MAX_IDLE_TIME = 30 * 1000; // 30 seconds
private static final long POLLING_INTERVAL = 60 * 1000; // 1 minute private static final long POLLING_INTERVAL = 60 * 1000; // 1 minute
private final Executor ioExecutor; private final Executor ioExecutor;
@@ -29,6 +30,6 @@ public class AndroidLanTcpPluginFactory implements DuplexPluginFactory {
public DuplexPlugin createPlugin(DuplexPluginCallback callback) { public DuplexPlugin createPlugin(DuplexPluginCallback callback) {
return new AndroidLanTcpPlugin(ioExecutor, appContext, callback, return new AndroidLanTcpPlugin(ioExecutor, appContext, callback,
MAX_FRAME_LENGTH, MAX_LATENCY, POLLING_INTERVAL); MAX_FRAME_LENGTH, MAX_LATENCY, MAX_IDLE_TIME, POLLING_INTERVAL);
} }
} }

View File

@@ -75,8 +75,8 @@ class TorPlugin implements DuplexPlugin, EventHandler {
private final Context appContext; private final Context appContext;
private final LocationUtils locationUtils; private final LocationUtils locationUtils;
private final DuplexPluginCallback callback; private final DuplexPluginCallback callback;
private final int maxFrameLength; private final int maxFrameLength, socketTimeout;
private final long maxLatency, pollingInterval; private final long maxLatency, maxIdleTime, pollingInterval;
private final File torDirectory, torFile, geoIpFile, configFile, doneFile; private final File torDirectory, torFile, geoIpFile, configFile, doneFile;
private final File cookieFile, hostnameFile; private final File cookieFile, hostnameFile;
private final AtomicBoolean circuitBuilt; private final AtomicBoolean circuitBuilt;
@@ -90,14 +90,19 @@ class TorPlugin implements DuplexPlugin, EventHandler {
TorPlugin(Executor ioExecutor, Context appContext, TorPlugin(Executor ioExecutor, Context appContext,
LocationUtils locationUtils, DuplexPluginCallback callback, LocationUtils locationUtils, DuplexPluginCallback callback,
int maxFrameLength, long maxLatency, long pollingInterval) { int maxFrameLength, long maxLatency, long maxIdleTime,
long pollingInterval) {
this.ioExecutor = ioExecutor; this.ioExecutor = ioExecutor;
this.appContext = appContext; this.appContext = appContext;
this.locationUtils = locationUtils; this.locationUtils = locationUtils;
this.callback = callback; this.callback = callback;
this.maxFrameLength = maxFrameLength; this.maxFrameLength = maxFrameLength;
this.maxLatency = maxLatency; this.maxLatency = maxLatency;
this.maxIdleTime = maxIdleTime;
this.pollingInterval = pollingInterval; this.pollingInterval = pollingInterval;
if(2 * maxIdleTime > Integer.MAX_VALUE)
socketTimeout = Integer.MAX_VALUE;
else socketTimeout = (int) (2 * maxIdleTime);
torDirectory = appContext.getDir("tor", MODE_PRIVATE); torDirectory = appContext.getDir("tor", MODE_PRIVATE);
torFile = new File(torDirectory, "tor"); torFile = new File(torDirectory, "tor");
geoIpFile = new File(torDirectory, "geoip"); geoIpFile = new File(torDirectory, "geoip");
@@ -120,6 +125,10 @@ class TorPlugin implements DuplexPlugin, EventHandler {
return maxLatency; return maxLatency;
} }
public long getMaxIdleTime() {
return maxIdleTime;
}
public boolean start() throws IOException { public boolean start() throws IOException {
// Try to connect to an existing Tor process if there is one // Try to connect to an existing Tor process if there is one
boolean startProcess = false; boolean startProcess = false;
@@ -446,6 +455,7 @@ class TorPlugin implements DuplexPlugin, EventHandler {
Socket s; Socket s;
try { try {
s = ss.accept(); s = ss.accept();
s.setSoTimeout(socketTimeout);
} catch(IOException e) { } catch(IOException e) {
// This is expected when the socket is closed // This is expected when the socket is closed
if(LOG.isLoggable(INFO)) LOG.info(e.toString()); if(LOG.isLoggable(INFO)) LOG.info(e.toString());
@@ -529,6 +539,7 @@ class TorPlugin implements DuplexPlugin, EventHandler {
Socks5Proxy proxy = new Socks5Proxy("127.0.0.1", SOCKS_PORT); Socks5Proxy proxy = new Socks5Proxy("127.0.0.1", SOCKS_PORT);
proxy.resolveAddrLocally(false); proxy.resolveAddrLocally(false);
Socket s = new SocksSocket(proxy, onion, 80); Socket s = new SocksSocket(proxy, onion, 80);
s.setSoTimeout(socketTimeout);
if(LOG.isLoggable(INFO)) LOG.info("Connected to " + onion); if(LOG.isLoggable(INFO)) LOG.info("Connected to " + onion);
return new TorTransportConnection(this, s); return new TorTransportConnection(this, s);
} catch(IOException e) { } catch(IOException e) {

View File

@@ -19,6 +19,7 @@ public class TorPluginFactory implements DuplexPluginFactory {
private static final int MAX_FRAME_LENGTH = 1024; private static final int MAX_FRAME_LENGTH = 1024;
private static final long MAX_LATENCY = 60 * 1000; // 1 minute private static final long MAX_LATENCY = 60 * 1000; // 1 minute
private static final long MAX_IDLE_TIME = 30 * 1000; // 30 seconds
private static final long POLLING_INTERVAL = 3 * 60 * 1000; // 3 minutes private static final long POLLING_INTERVAL = 3 * 60 * 1000; // 3 minutes
private final Executor ioExecutor; private final Executor ioExecutor;
@@ -43,6 +44,6 @@ public class TorPluginFactory implements DuplexPluginFactory {
return null; return null;
} }
return new TorPlugin(ioExecutor,appContext, locationUtils, callback, return new TorPlugin(ioExecutor,appContext, locationUtils, callback,
MAX_FRAME_LENGTH, MAX_LATENCY, POLLING_INTERVAL); MAX_FRAME_LENGTH, MAX_LATENCY, MAX_IDLE_TIME, POLLING_INTERVAL);
} }
} }

View File

@@ -7,6 +7,9 @@ import org.briarproject.api.plugins.Plugin;
/** An interface for transport plugins that support duplex communication. */ /** An interface for transport plugins that support duplex communication. */
public interface DuplexPlugin extends Plugin { public interface DuplexPlugin extends Plugin {
/** Returns the transport's maximum idle time in milliseconds. */
long getMaxIdleTime();
/** /**
* Attempts to create and return a connection to the given contact using * Attempts to create and return a connection to the given contact using
* the current transport and configuration properties. Returns null if a * the current transport and configuration properties. Returns null if a

View File

@@ -1,5 +1,6 @@
package org.briarproject.messaging; package org.briarproject.messaging;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.logging.Level.INFO; import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING; import static java.util.logging.Level.WARNING;
import static org.briarproject.api.messaging.MessagingConstants.MAX_PACKET_LENGTH; import static org.briarproject.api.messaging.MessagingConstants.MAX_PACKET_LENGTH;
@@ -54,6 +55,7 @@ import org.briarproject.api.messaging.TransportUpdate;
*/ */
class DuplexOutgoingSession implements MessagingSession, EventListener { class DuplexOutgoingSession implements MessagingSession, EventListener {
private static final int MAX_IDLE_TIME = 30 * 1000; // Milliseconds
private static final Logger LOG = private static final Logger LOG =
Logger.getLogger(DuplexOutgoingSession.class.getName()); Logger.getLogger(DuplexOutgoingSession.class.getName());
@@ -108,7 +110,12 @@ class DuplexOutgoingSession implements MessagingSession, EventListener {
while(!interrupted) { while(!interrupted) {
// Flush the stream if it's going to be idle // Flush the stream if it's going to be idle
if(writerTasks.isEmpty()) out.flush(); if(writerTasks.isEmpty()) out.flush();
ThrowingRunnable<IOException> task = writerTasks.take(); ThrowingRunnable<IOException> task = writerTasks.poll(
MAX_IDLE_TIME, MILLISECONDS);
if(task == null) {
LOG.info("Idle timeout");
continue; // Flush and wait again
}
if(task == CLOSE) break; if(task == CLOSE) break;
task.run(); task.run();
} }

View File

@@ -17,8 +17,9 @@ class LanTcpPlugin extends TcpPlugin {
static final TransportId ID = new TransportId("lan"); static final TransportId ID = new TransportId("lan");
LanTcpPlugin(Executor ioExecutor, DuplexPluginCallback callback, LanTcpPlugin(Executor ioExecutor, DuplexPluginCallback callback,
int maxFrameLength, long maxLatency, long pollingInterval) { int maxFrameLength, long maxLatency, long maxIdleTime,
super(ioExecutor, callback, maxFrameLength, maxLatency, long pollingInterval) {
super(ioExecutor, callback, maxFrameLength, maxLatency, maxIdleTime,
pollingInterval); pollingInterval);
} }

View File

@@ -11,6 +11,7 @@ public class LanTcpPluginFactory implements DuplexPluginFactory {
private static final int MAX_FRAME_LENGTH = 1024; private static final int MAX_FRAME_LENGTH = 1024;
private static final long MAX_LATENCY = 60 * 1000; // 1 minute private static final long MAX_LATENCY = 60 * 1000; // 1 minute
private static final long MAX_IDLE_TIME = 30 * 1000; // 30 seconds
private static final long POLLING_INTERVAL = 60 * 1000; // 1 minute private static final long POLLING_INTERVAL = 60 * 1000; // 1 minute
private final Executor ioExecutor; private final Executor ioExecutor;
@@ -25,6 +26,6 @@ public class LanTcpPluginFactory implements DuplexPluginFactory {
public DuplexPlugin createPlugin(DuplexPluginCallback callback) { public DuplexPlugin createPlugin(DuplexPluginCallback callback) {
return new LanTcpPlugin(ioExecutor, callback, MAX_FRAME_LENGTH, return new LanTcpPlugin(ioExecutor, callback, MAX_FRAME_LENGTH,
MAX_LATENCY, POLLING_INTERVAL); MAX_LATENCY, MAX_IDLE_TIME, POLLING_INTERVAL);
} }
} }

View File

@@ -37,8 +37,8 @@ abstract class TcpPlugin implements DuplexPlugin {
protected final Executor ioExecutor; protected final Executor ioExecutor;
protected final DuplexPluginCallback callback; protected final DuplexPluginCallback callback;
protected final int maxFrameLength; protected final int maxFrameLength, socketTimeout;
protected final long maxLatency, pollingInterval; protected final long maxLatency, maxIdleTime, pollingInterval;
protected volatile boolean running = false; protected volatile boolean running = false;
protected volatile ServerSocket socket = null; protected volatile ServerSocket socket = null;
@@ -53,12 +53,17 @@ abstract class TcpPlugin implements DuplexPlugin {
protected abstract boolean isConnectable(InetSocketAddress remote); protected abstract boolean isConnectable(InetSocketAddress remote);
protected TcpPlugin(Executor ioExecutor, DuplexPluginCallback callback, protected TcpPlugin(Executor ioExecutor, DuplexPluginCallback callback,
int maxFrameLength, long maxLatency, long pollingInterval) { int maxFrameLength, long maxLatency, long maxIdleTime,
long pollingInterval) {
this.ioExecutor = ioExecutor; this.ioExecutor = ioExecutor;
this.callback = callback; this.callback = callback;
this.maxFrameLength = maxFrameLength; this.maxFrameLength = maxFrameLength;
this.maxLatency = maxLatency; this.maxLatency = maxLatency;
this.maxIdleTime = maxIdleTime;
this.pollingInterval = pollingInterval; this.pollingInterval = pollingInterval;
if(2 * maxIdleTime > Integer.MAX_VALUE)
socketTimeout = Integer.MAX_VALUE;
else socketTimeout = (int) (2 * maxIdleTime);
} }
public int getMaxFrameLength() { public int getMaxFrameLength() {
@@ -69,6 +74,10 @@ abstract class TcpPlugin implements DuplexPlugin {
return maxLatency; return maxLatency;
} }
public long getMaxIdleTime() {
return maxIdleTime;
}
public boolean start() { public boolean start() {
running = true; running = true;
bind(); bind();
@@ -136,6 +145,7 @@ abstract class TcpPlugin implements DuplexPlugin {
Socket s; Socket s;
try { try {
s = socket.accept(); s = socket.accept();
s.setSoTimeout(socketTimeout);
} catch(IOException e) { } catch(IOException e) {
// This is expected when the socket is closed // This is expected when the socket is closed
if(LOG.isLoggable(INFO)) LOG.info(e.toString()); if(LOG.isLoggable(INFO)) LOG.info(e.toString());
@@ -195,6 +205,7 @@ abstract class TcpPlugin implements DuplexPlugin {
try { try {
if(LOG.isLoggable(INFO)) LOG.info("Connecting to " + remote); if(LOG.isLoggable(INFO)) LOG.info("Connecting to " + remote);
s.connect(remote); s.connect(remote);
s.setSoTimeout(socketTimeout);
if(LOG.isLoggable(INFO)) LOG.info("Connected to " + remote); if(LOG.isLoggable(INFO)) LOG.info("Connected to " + remote);
return new TcpTransportConnection(this, s); return new TcpTransportConnection(this, s);
} catch(IOException e) { } catch(IOException e) {

View File

@@ -21,9 +21,9 @@ class WanTcpPlugin extends TcpPlugin {
private volatile MappingResult mappingResult; private volatile MappingResult mappingResult;
WanTcpPlugin(Executor ioExecutor, DuplexPluginCallback callback, WanTcpPlugin(Executor ioExecutor, DuplexPluginCallback callback,
int maxFrameLength, long maxLatency, long pollingInterval, int maxFrameLength, long maxLatency, long maxIdleTime,
PortMapper portMapper) { long pollingInterval, PortMapper portMapper) {
super(ioExecutor, callback, maxFrameLength, maxLatency, super(ioExecutor, callback, maxFrameLength, maxLatency, maxIdleTime,
pollingInterval); pollingInterval);
this.portMapper = portMapper; this.portMapper = portMapper;
} }

View File

@@ -12,6 +12,7 @@ public class WanTcpPluginFactory implements DuplexPluginFactory {
private static final int MAX_FRAME_LENGTH = 1024; private static final int MAX_FRAME_LENGTH = 1024;
private static final long MAX_LATENCY = 60 * 1000; // 1 minute private static final long MAX_LATENCY = 60 * 1000; // 1 minute
private static final long MAX_IDLE_TIME = 30 * 1000; // 30 seconds
private static final long POLLING_INTERVAL = 5 * 60 * 1000; // 5 minutes private static final long POLLING_INTERVAL = 5 * 60 * 1000; // 5 minutes
private final Executor ioExecutor; private final Executor ioExecutor;
@@ -29,7 +30,7 @@ public class WanTcpPluginFactory implements DuplexPluginFactory {
public DuplexPlugin createPlugin(DuplexPluginCallback callback) { public DuplexPlugin createPlugin(DuplexPluginCallback callback) {
return new WanTcpPlugin(ioExecutor, callback, MAX_FRAME_LENGTH, return new WanTcpPlugin(ioExecutor, callback, MAX_FRAME_LENGTH,
MAX_LATENCY, POLLING_INTERVAL, MAX_LATENCY, MAX_IDLE_TIME, POLLING_INTERVAL,
new PortMapperImpl(shutdownManager)); new PortMapperImpl(shutdownManager));
} }
} }

View File

@@ -42,7 +42,7 @@ class StreamWriterImpl extends OutputStream implements StreamWriter {
@Override @Override
public void flush() throws IOException { public void flush() throws IOException {
if(length > 0) writeFrame(false); writeFrame(false);
out.flush(); out.flush();
} }

View File

@@ -78,6 +78,11 @@ class BluetoothPlugin implements DuplexPlugin {
return maxLatency; return maxLatency;
} }
public long getMaxIdleTime() {
// Bluetooth detects dead connections so we don't need keepalives
return Long.MAX_VALUE;
}
public boolean start() throws IOException { public boolean start() throws IOException {
// Initialise the Bluetooth stack // Initialise the Bluetooth stack
try { try {

View File

@@ -61,6 +61,11 @@ class ModemPlugin implements DuplexPlugin, Modem.Callback {
return maxLatency; return maxLatency;
} }
public long getMaxIdleTime() {
// FIXME: Do we need keepalives for this transport?
return Long.MAX_VALUE;
}
public boolean start() { public boolean start() {
for(String portName : serialPortList.getPortNames()) { for(String portName : serialPortList.getPortNames()) {
if(LOG.isLoggable(INFO)) if(LOG.isLoggable(INFO))

View File

@@ -15,6 +15,11 @@ import org.briarproject.plugins.DuplexClientTest;
// is running on another machine // is running on another machine
public class LanTcpClientTest extends DuplexClientTest { public class LanTcpClientTest extends DuplexClientTest {
private static final int MAX_FRAME_LENGTH = 1024;
private static final int MAX_LATENCY = 60 * 1000;
private static final int MAX_IDLE_TIME = 30 * 1000;
private static final int POLLING_INTERVAL = 60 * 1000;
private LanTcpClientTest(Executor executor, String serverAddress, private LanTcpClientTest(Executor executor, String serverAddress,
String serverPort) { String serverPort) {
// Store the server's internal address and port // Store the server's internal address and port
@@ -22,11 +27,12 @@ public class LanTcpClientTest extends DuplexClientTest {
p.put("address", serverAddress); p.put("address", serverAddress);
p.put("port", serverPort); p.put("port", serverPort);
Map<ContactId, TransportProperties> remote = Map<ContactId, TransportProperties> remote =
Collections.singletonMap(contactId, p); Collections.singletonMap(contactId, p);
// Create the plugin // Create the plugin
callback = new ClientCallback(new TransportConfig(), callback = new ClientCallback(new TransportConfig(),
new TransportProperties(), remote); new TransportProperties(), remote);
plugin = new LanTcpPlugin(executor, callback, 0, 0, 0); plugin = new LanTcpPlugin(executor, callback, MAX_FRAME_LENGTH,
MAX_LATENCY, MAX_IDLE_TIME, POLLING_INTERVAL);
} }
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {

View File

@@ -32,7 +32,7 @@ public class LanTcpPluginTest extends BriarTestCase {
@Test @Test
public void testAddressesAreOnSameLan() { public void testAddressesAreOnSameLan() {
LanTcpPlugin plugin = new LanTcpPlugin(null, null, 0, 0, 0); LanTcpPlugin plugin = new LanTcpPlugin(null, null, 0, 0, 0, 0);
// Local and remote in 10.0.0.0/8 should return true // Local and remote in 10.0.0.0/8 should return true
assertTrue(plugin.addressesAreOnSameLan(makeAddress(10, 0, 0, 0), assertTrue(plugin.addressesAreOnSameLan(makeAddress(10, 0, 0, 0),
makeAddress(10, 255, 255, 255))); makeAddress(10, 255, 255, 255)));
@@ -81,7 +81,7 @@ public class LanTcpPluginTest extends BriarTestCase {
} }
Callback callback = new Callback(); Callback callback = new Callback();
Executor executor = Executors.newCachedThreadPool(); Executor executor = Executors.newCachedThreadPool();
DuplexPlugin plugin = new LanTcpPlugin(executor, callback, 0, 0, 0); DuplexPlugin plugin = new LanTcpPlugin(executor, callback, 0, 0, 0, 0);
plugin.start(); plugin.start();
// The plugin should have bound a socket and stored the port number // The plugin should have bound a socket and stored the port number
assertTrue(callback.propertiesLatch.await(5, SECONDS)); assertTrue(callback.propertiesLatch.await(5, SECONDS));
@@ -113,7 +113,7 @@ public class LanTcpPluginTest extends BriarTestCase {
} }
Callback callback = new Callback(); Callback callback = new Callback();
Executor executor = Executors.newCachedThreadPool(); Executor executor = Executors.newCachedThreadPool();
DuplexPlugin plugin = new LanTcpPlugin(executor, callback, 0, 0, 0); DuplexPlugin plugin = new LanTcpPlugin(executor, callback, 0, 0, 0, 0);
plugin.start(); plugin.start();
// The plugin should have bound a socket and stored the port number // The plugin should have bound a socket and stored the port number
assertTrue(callback.propertiesLatch.await(5, SECONDS)); assertTrue(callback.propertiesLatch.await(5, SECONDS));

View File

@@ -13,11 +13,17 @@ import org.briarproject.plugins.DuplexServerTest;
// is running on another machine // is running on another machine
public class LanTcpServerTest extends DuplexServerTest { public class LanTcpServerTest extends DuplexServerTest {
private static final int MAX_FRAME_LENGTH = 1024;
private static final int MAX_LATENCY = 60 * 1000;
private static final int MAX_IDLE_TIME = 30 * 1000;
private static final int POLLING_INTERVAL = 60 * 1000;
private LanTcpServerTest(Executor executor) { private LanTcpServerTest(Executor executor) {
callback = new ServerCallback(new TransportConfig(), callback = new ServerCallback(new TransportConfig(),
new TransportProperties(), new TransportProperties(),
Collections.singletonMap(contactId, new TransportProperties())); Collections.singletonMap(contactId, new TransportProperties()));
plugin = new LanTcpPlugin(executor, callback, 0, 0, 0); plugin = new LanTcpPlugin(executor, callback, MAX_FRAME_LENGTH,
MAX_LATENCY, MAX_IDLE_TIME, POLLING_INTERVAL);
} }
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {

View File

@@ -31,11 +31,15 @@ public class StreamWriterImplTest extends BriarTestCase {
} }
@Test @Test
public void testFlushWithoutBufferedDataOnlyFlushes() throws Exception { public void testFlushWithoutBufferedDataWritesFrameAndFlushes()
throws Exception {
Mockery context = new Mockery(); Mockery context = new Mockery();
final FrameWriter writer = context.mock(FrameWriter.class); final FrameWriter writer = context.mock(FrameWriter.class);
StreamWriterImpl w = new StreamWriterImpl(writer, FRAME_LENGTH); StreamWriterImpl w = new StreamWriterImpl(writer, FRAME_LENGTH);
context.checking(new Expectations() {{ context.checking(new Expectations() {{
// Write a non-final frame with an empty payload
oneOf(writer).writeFrame(with(any(byte[].class)), with(0),
with(false));
// Flush the stream // Flush the stream
oneOf(writer).flush(); oneOf(writer).flush();
}}); }});