Use a fresh receive queue for each call.

This prevents a caller from getting stale data and/or a stale EOF marker
if a previous caller didn't consume the queue.
This commit is contained in:
akwizgran
2012-11-26 16:16:36 +00:00
parent 98f1f26fcf
commit 2a293b5018

View File

@@ -36,11 +36,12 @@ class ModemImpl implements Modem, SerialPortEventListener {
private final SerialPort port; private final SerialPort port;
private final AtomicBoolean initialised, connected; private final AtomicBoolean initialised, connected;
private final Semaphore offHook; private final Semaphore offHook;
private final BlockingQueue<byte[]> received;
private final byte[] line; private final byte[] line;
private int lineLen = 0; private int lineLen = 0;
private volatile BlockingQueue<byte[]> received = null;
ModemImpl(Executor executor, Callback callback, String portName) { ModemImpl(Executor executor, Callback callback, String portName) {
this.executor = executor; this.executor = executor;
this.callback = callback; this.callback = callback;
@@ -48,7 +49,6 @@ class ModemImpl implements Modem, SerialPortEventListener {
initialised = new AtomicBoolean(false); initialised = new AtomicBoolean(false);
offHook = new Semaphore(1); offHook = new Semaphore(1);
connected = new AtomicBoolean(false); connected = new AtomicBoolean(false);
received = new LinkedBlockingQueue<byte[]>();
line = new byte[MAX_LINE_LENGTH]; line = new byte[MAX_LINE_LENGTH];
} }
@@ -92,6 +92,7 @@ class ModemImpl implements Modem, SerialPortEventListener {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw new IOException("Interrupted while initialising modem"); throw new IOException("Interrupted while initialising modem");
} }
received = new LinkedBlockingQueue<byte[]>();
} }
public boolean dial(String number) throws IOException { public boolean dial(String number) throws IOException {
@@ -124,7 +125,7 @@ class ModemImpl implements Modem, SerialPortEventListener {
} }
public InputStream getInputStream() { public InputStream getInputStream() {
return new ModemInputStream(); return new ModemInputStream(received);
} }
public OutputStream getOutputStream() { public OutputStream getOutputStream() {
@@ -140,6 +141,7 @@ class ModemImpl implements Modem, SerialPortEventListener {
throw new IOException(e.toString()); throw new IOException(e.toString());
} }
received.add(new byte[0]); // Empty buffer indicates EOF received.add(new byte[0]); // Empty buffer indicates EOF
received = new LinkedBlockingQueue<byte[]>();
connected.set(false); connected.set(false);
offHook.release(); offHook.release();
} }
@@ -246,9 +248,15 @@ class ModemImpl implements Modem, SerialPortEventListener {
private class ModemInputStream extends InputStream { private class ModemInputStream extends InputStream {
private final BlockingQueue<byte[]> received;
private byte[] buf = null; private byte[] buf = null;
private int offset = 0; private int offset = 0;
private ModemInputStream(BlockingQueue<byte[]> received) {
this.received = received;
}
@Override @Override
public int read() throws IOException { public int read() throws IOException {
getBufferIfNecessary(); getBufferIfNecessary();