Merge branch 'improve-thread-encapsulation' into upstream

This commit is contained in:
Abraham Kiggundu
2015-01-05 11:07:55 +03:00
21 changed files with 1034 additions and 522 deletions

View File

@@ -4,6 +4,10 @@ import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
class PollingRemovableDriveMonitor implements RemovableDriveMonitor, Runnable {
@@ -14,11 +18,14 @@ class PollingRemovableDriveMonitor implements RemovableDriveMonitor, Runnable {
private final Executor ioExecutor;
private final RemovableDriveFinder finder;
private final long pollingInterval;
private final Object pollingLock = new Object();
private volatile boolean running = false;
private volatile Callback callback = null;
private final Lock synchLock = new ReentrantLock();
private final Condition stopPolling = synchLock.newCondition();
public PollingRemovableDriveMonitor(Executor ioExecutor,
RemovableDriveFinder finder, long pollingInterval) {
this.ioExecutor = ioExecutor;
@@ -34,8 +41,12 @@ class PollingRemovableDriveMonitor implements RemovableDriveMonitor, Runnable {
public void stop() throws IOException {
running = false;
synchronized(pollingLock) {
pollingLock.notifyAll();
synchLock.lock();
try {
stopPolling.signalAll();
}
finally {
synchLock.unlock();
}
}
@@ -43,8 +54,12 @@ class PollingRemovableDriveMonitor implements RemovableDriveMonitor, Runnable {
try {
Collection<File> drives = finder.findRemovableDrives();
while(running) {
synchronized(pollingLock) {
pollingLock.wait(pollingInterval);
synchLock.lock();
try {
stopPolling.await(pollingInterval, TimeUnit.MILLISECONDS);
}
finally{
synchLock.unlock();
}
if(!running) return;
Collection<File> newDrives = finder.findRemovableDrives();

View File

@@ -4,6 +4,9 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import net.contentobjects.jnotify.JNotify;
import net.contentobjects.jnotify.JNotifyListener;
@@ -11,17 +14,20 @@ import net.contentobjects.jnotify.JNotifyListener;
abstract class UnixRemovableDriveMonitor implements RemovableDriveMonitor,
JNotifyListener {
private static boolean triedLoad = false; // Locking: class
private static Throwable loadError = null; // Locking: class
private static boolean triedLoad = false;
private static Throwable loadError = null;
// Locking: this
private final List<Integer> watches = new ArrayList<Integer>();
private boolean started = false; // Locking: this
private Callback callback = null; // Locking: this
private boolean started = false;
private Callback callback = null;
protected abstract String[] getPathsToWatch();
//TODO: rationalise this in a further refactor
private final Lock synchLock = new ReentrantLock();
private static final Lock staticSynchLock = new ReentrantLock();
private static Throwable tryLoad() {
try {
Class.forName("net.contentobjects.jnotify.JNotify");
@@ -33,12 +39,18 @@ JNotifyListener {
}
}
public static synchronized void checkEnabled() throws IOException {
if(!triedLoad) {
loadError = tryLoad();
triedLoad = true;
public static void checkEnabled() throws IOException {
staticSynchLock.lock();
try {
if(!triedLoad) {
loadError = tryLoad();
triedLoad = true;
}
if(loadError != null) throw new IOException(loadError.toString());
}
finally{
staticSynchLock.unlock();
}
if(loadError != null) throw new IOException(loadError.toString());
}
public void start(Callback callback) throws IOException {
@@ -49,34 +61,46 @@ JNotifyListener {
if(new File(path).exists())
watches.add(JNotify.addWatch(path, mask, false, this));
}
synchronized(this) {
assert !started;
assert this.callback == null;
started = true;
this.callback = callback;
this.watches.addAll(watches);
}
synchLock.lock();
try {
assert !started;
assert this.callback == null;
started = true;
this.callback = callback;
this.watches.addAll(watches);
}
finally{
synchLock.unlock();
}
}
public void stop() throws IOException {
checkEnabled();
List<Integer> watches;
synchronized(this) {
assert started;
assert callback != null;
started = false;
callback = null;
watches = new ArrayList<Integer>(this.watches);
this.watches.clear();
}
synchLock.lock();
try {
assert started;
assert callback != null;
started = false;
callback = null;
watches = new ArrayList<Integer>(this.watches);
this.watches.clear();
}
finally{
synchLock.unlock();
}
for(Integer w : watches) JNotify.removeWatch(w);
}
public void fileCreated(int wd, String rootPath, String name) {
Callback callback;
synchronized(this) {
callback = this.callback;
}
synchLock.lock();
try {
callback = this.callback;
}
finally{
synchLock.unlock();
}
if(callback != null)
callback.driveInserted(new File(rootPath + "/" + name));
}

View File

@@ -10,6 +10,10 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
import jssc.SerialPortEvent;
@@ -42,8 +46,13 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener {
private int lineLen = 0;
private ReliabilityLayer reliability = null; // Locking: this
private boolean initialised = false, connected = false; // Locking: this
private ReliabilityLayer reliability = null;
private boolean initialised = false, connected = false;
private final Lock synchLock = new ReentrantLock();
private final Condition connectedStateChanged = synchLock.newCondition();
private final Condition initialisedStateChanged = synchLock.newCondition();
ModemImpl(Executor ioExecutor, ReliabilityLayerFactory reliabilityFactory,
Clock clock, Callback callback, SerialPort port) {
@@ -91,14 +100,18 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener {
// Wait for the event thread to receive "OK"
boolean success = false;
try {
synchronized(this) {
synchLock.lock();
try {
long now = clock.currentTimeMillis();
long end = now + OK_TIMEOUT;
while(now < end && !initialised) {
wait(end - now);
initialisedStateChanged.await(end - now, TimeUnit.MILLISECONDS);
now = clock.currentTimeMillis();
}
success = initialised;
}
finally{
synchLock.unlock();
}
} catch(InterruptedException e) {
tryToClose(port);
@@ -123,11 +136,16 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener {
public void stop() throws IOException {
LOG.info("Stopping");
// Wake any threads that are waiting to connect
synchronized(this) {
synchLock.lock();
try {
// Wake any threads that are waiting to connect
initialised = false;
connected = false;
notifyAll();
initialisedStateChanged.signalAll();
connectedStateChanged.signalAll();
}
finally{
synchLock.unlock();
}
// Hang up if necessary and close the port
try {
@@ -145,10 +163,10 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener {
}
}
// Locking: stateChange
private void hangUpInner() throws IOException {
ReliabilityLayer reliability;
synchronized(this) {
synchLock.lock();
try {
if(this.reliability == null) {
LOG.info("Not hanging up - already on the hook");
return;
@@ -156,6 +174,9 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener {
reliability = this.reliability;
this.reliability = null;
connected = false;
}
finally{
synchLock.unlock();
}
reliability.stop();
LOG.info("Hanging up");
@@ -182,7 +203,8 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener {
try {
ReliabilityLayer reliability =
reliabilityFactory.createReliabilityLayer(this);
synchronized(this) {
synchLock.lock();
try {
if(!initialised) {
LOG.info("Not dialling - modem not initialised");
return false;
@@ -192,6 +214,9 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener {
return false;
}
this.reliability = reliability;
}
finally{
synchLock.unlock();
}
reliability.start();
LOG.info("Dialling");
@@ -204,14 +229,18 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener {
}
// Wait for the event thread to receive "CONNECT"
try {
synchronized(this) {
synchLock.lock();
try {
long now = clock.currentTimeMillis();
long end = now + CONNECT_TIMEOUT;
while(now < end && initialised && !connected) {
wait(end - now);
connectedStateChanged.await(end - now, TimeUnit.MILLISECONDS);
now = clock.currentTimeMillis();
}
if(connected) return true;
}
finally{
synchLock.unlock();
}
} catch(InterruptedException e) {
tryToClose(port);
@@ -227,8 +256,12 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener {
public InputStream getInputStream() throws IOException {
ReliabilityLayer reliability;
synchronized(this) {
synchLock.lock();
try {
reliability = this.reliability;
}
finally{
synchLock.unlock();
}
if(reliability == null) throw new IOException("Not connected");
return reliability.getInputStream();
@@ -236,8 +269,12 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener {
public OutputStream getOutputStream() throws IOException {
ReliabilityLayer reliability;
synchronized(this) {
synchLock.lock();
try {
reliability = this.reliability;
}
finally{
synchLock.unlock();
}
if(reliability == null) throw new IOException("Not connected");
return reliability.getOutputStream();
@@ -288,8 +325,12 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener {
private boolean handleData(byte[] b) throws IOException {
ReliabilityLayer reliability;
synchronized(this) {
synchLock.lock();
try {
reliability = this.reliability;
}
finally{
synchLock.unlock();
}
if(reliability == null) return false;
reliability.handleRead(b);
@@ -309,9 +350,13 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener {
lineLen = 0;
if(LOG.isLoggable(INFO)) LOG.info("Modem status: " + s);
if(s.startsWith("CONNECT")) {
synchronized(this) {
synchLock.lock();
try {
connected = true;
notifyAll();
connectedStateChanged.signalAll();
}
finally{
synchLock.unlock();
}
// There might be data in the buffer as well as text
int off = i + 1;
@@ -323,14 +368,22 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener {
return;
} else if(s.equals("BUSY") || s.equals("NO DIALTONE")
|| s.equals("NO CARRIER")) {
synchronized(this) {
synchLock.lock();
try {
connected = false;
notifyAll();
connectedStateChanged.signalAll();
}
finally{
synchLock.unlock();
}
} else if(s.equals("OK")) {
synchronized(this) {
synchLock.lock();
try {
initialised = true;
notifyAll();
initialisedStateChanged.signalAll();
}
finally{
synchLock.unlock();
}
} else if(s.equals("RING")) {
ioExecutor.execute(new Runnable() {
@@ -358,7 +411,8 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener {
try {
ReliabilityLayer reliability =
reliabilityFactory.createReliabilityLayer(this);
synchronized(this) {
synchLock.lock();
try {
if(!initialised) {
LOG.info("Not answering - modem not initialised");
return;
@@ -368,6 +422,9 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener {
return;
}
this.reliability = reliability;
}
finally{
synchLock.unlock();
}
reliability.start();
LOG.info("Answering");
@@ -380,14 +437,18 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener {
// Wait for the event thread to receive "CONNECT"
boolean success = false;
try {
synchronized(this) {
synchLock.lock();
try {
long now = clock.currentTimeMillis();
long end = now + CONNECT_TIMEOUT;
while(now < end && initialised && !connected) {
wait(end - now);
connectedStateChanged.await(end - now, TimeUnit.MILLISECONDS);
now = clock.currentTimeMillis();
}
success = connected;
}
finally{
synchLock.unlock();
}
} catch(InterruptedException e) {
tryToClose(port);