Replaced redundant SocketReceiver classes with a generic class.

This commit is contained in:
akwizgran
2013-06-13 12:40:19 +01:00
parent 3833eac4e2
commit a17349e015
3 changed files with 60 additions and 77 deletions

View File

@@ -22,8 +22,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
import net.sf.briar.api.ContactId;
@@ -35,6 +33,7 @@ import net.sf.briar.api.crypto.PseudoRandom;
import net.sf.briar.api.plugins.duplex.DuplexPlugin;
import net.sf.briar.api.plugins.duplex.DuplexPluginCallback;
import net.sf.briar.api.plugins.duplex.DuplexTransportConnection;
import net.sf.briar.util.LatchedReference;
import net.sf.briar.util.StringUtils;
import android.bluetooth.BluetoothAdapter;
import android.bluetooth.BluetoothDevice;
@@ -361,12 +360,13 @@ class DroidtoothPlugin implements DuplexPlugin {
return null;
}
// Start the background threads
SocketReceiver receiver = new SocketReceiver();
new DiscoveryThread(receiver, uuid.toString(), timeout).start();
new BluetoothListenerThread(receiver, ss).start();
LatchedReference<BluetoothSocket> socketLatch =
new LatchedReference<BluetoothSocket>();
new DiscoveryThread(socketLatch, uuid.toString(), timeout).start();
new BluetoothListenerThread(socketLatch, ss).start();
// Wait for an incoming or outgoing connection
try {
BluetoothSocket s = receiver.waitForSocket(timeout);
BluetoothSocket s = socketLatch.waitForReference(timeout);
if(s != null) return new DroidtoothTransportConnection(this, s);
} catch(InterruptedException e) {
if(LOG.isLoggable(INFO))
@@ -409,40 +409,15 @@ class DroidtoothPlugin implements DuplexPlugin {
}
}
private static class SocketReceiver {
private final CountDownLatch latch = new CountDownLatch(1);
private final AtomicReference<BluetoothSocket> socket =
new AtomicReference<BluetoothSocket>();
private boolean hasSocket() {
return socket.get() != null;
}
private boolean setSocket(BluetoothSocket s) {
if(socket.compareAndSet(null, s)) {
latch.countDown();
return true;
}
return false;
}
private BluetoothSocket waitForSocket(long timeout)
throws InterruptedException {
latch.await(timeout, TimeUnit.MILLISECONDS);
return socket.get();
}
}
private class DiscoveryThread extends Thread {
private final SocketReceiver receiver;
private final LatchedReference<BluetoothSocket> socketLatch;
private final String uuid;
private final long timeout;
private DiscoveryThread(SocketReceiver receiver, String uuid,
long timeout) {
this.receiver = receiver;
private DiscoveryThread(LatchedReference<BluetoothSocket> socketLatch,
String uuid, long timeout) {
this.socketLatch = socketLatch;
this.uuid = uuid;
this.timeout = timeout;
}
@@ -451,7 +426,7 @@ class DroidtoothPlugin implements DuplexPlugin {
public void run() {
long now = clock.currentTimeMillis();
long end = now + timeout;
while(now < end && running && !receiver.hasSocket()) {
while(now < end && running && !socketLatch.isSet()) {
// Discover nearby devices
if(LOG.isLoggable(INFO)) LOG.info("Discovering nearby devices");
List<String> addresses;
@@ -465,12 +440,12 @@ class DroidtoothPlugin implements DuplexPlugin {
// Connect to any device with the right UUID
for(String address : addresses) {
now = clock.currentTimeMillis();
if(now < end && running && !receiver.hasSocket()) {
if(now < end && running && !socketLatch.isSet()) {
BluetoothSocket s = connect(address, uuid);
if(s == null) continue;
if(LOG.isLoggable(INFO))
LOG.info("Outgoing connection");
if(!receiver.setSocket(s)) {
if(!socketLatch.set(s)) {
if(LOG.isLoggable(INFO))
LOG.info("Closing redundant connection");
tryToClose(s);
@@ -519,12 +494,13 @@ class DroidtoothPlugin implements DuplexPlugin {
private static class BluetoothListenerThread extends Thread {
private final SocketReceiver receiver;
private final LatchedReference<BluetoothSocket> socketLatch;
private final BluetoothServerSocket serverSocket;
private BluetoothListenerThread(SocketReceiver receiver,
private BluetoothListenerThread(
LatchedReference<BluetoothSocket> socketLatch,
BluetoothServerSocket serverSocket) {
this.receiver = receiver;
this.socketLatch = socketLatch;
this.serverSocket = serverSocket;
}
@@ -533,7 +509,7 @@ class DroidtoothPlugin implements DuplexPlugin {
try {
BluetoothSocket s = serverSocket.accept();
if(LOG.isLoggable(INFO)) LOG.info("Incoming connection");
if(!receiver.setSocket(s)) {
if(!socketLatch.set(s)) {
if(LOG.isLoggable(INFO))
LOG.info("Closing redundant connection");
s.close();

View File

@@ -18,10 +18,7 @@ import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
import net.sf.briar.api.TransportId;
@@ -31,6 +28,7 @@ import net.sf.briar.api.crypto.PseudoRandom;
import net.sf.briar.api.plugins.duplex.DuplexPluginCallback;
import net.sf.briar.api.plugins.duplex.DuplexTransportConnection;
import net.sf.briar.util.ByteUtils;
import net.sf.briar.util.LatchedReference;
import net.sf.briar.util.StringUtils;
/** A socket plugin that supports exchanging invitations over a LAN. */
@@ -151,9 +149,9 @@ class LanTcpPlugin extends TcpPlugin {
return null;
}
// Start the listener threads
SocketReceiver receiver = new SocketReceiver();
new MulticastListenerThread(receiver, ms, iface).start();
new TcpListenerThread(receiver, ss).start();
LatchedReference<Socket> socketLatch = new LatchedReference<Socket>();
new MulticastListenerThread(socketLatch, ms, iface).start();
new TcpListenerThread(socketLatch, ss).start();
// Send packets until a connection is made or we run out of time
byte[] buffer = new byte[2];
ByteUtils.writeUint16(ss.getLocalPort(), buffer, 0);
@@ -169,7 +167,7 @@ class LanTcpPlugin extends TcpPlugin {
ms.send(packet);
// Wait for an incoming or outgoing connection
try {
Socket s = receiver.waitForSocket(MULTICAST_INTERVAL);
Socket s = socketLatch.waitForReference(MULTICAST_INTERVAL);
if(s != null) return new TcpTransportConnection(this, s);
} catch(InterruptedException e) {
if(LOG.isLoggable(INFO))
@@ -252,35 +250,15 @@ class LanTcpPlugin extends TcpPlugin {
return sendInvitation(r, timeout);
}
private static class SocketReceiver {
private final CountDownLatch latch = new CountDownLatch(1);
private final AtomicReference<Socket> socket =
new AtomicReference<Socket>();
private boolean setSocket(Socket s) {
if(socket.compareAndSet(null, s)) {
latch.countDown();
return true;
}
return false;
}
private Socket waitForSocket(long timeout) throws InterruptedException {
latch.await(timeout, TimeUnit.MILLISECONDS);
return socket.get();
}
}
private class MulticastListenerThread extends Thread {
private final SocketReceiver receiver;
private final LatchedReference<Socket> socketLatch;
private final MulticastSocket multicastSocket;
private final InetAddress localAddress;
private MulticastListenerThread(SocketReceiver receiver,
private MulticastListenerThread(LatchedReference<Socket> socketLatch,
MulticastSocket multicastSocket, InetAddress localAddress) {
this.receiver = receiver;
this.socketLatch = socketLatch;
this.multicastSocket = multicastSocket;
this.localAddress = localAddress;
}
@@ -329,7 +307,7 @@ class LanTcpPlugin extends TcpPlugin {
// Connect back on the advertised TCP port
Socket s = new Socket(addr, port);
if(LOG.isLoggable(INFO)) LOG.info("Outgoing connection");
if(!receiver.setSocket(s)) {
if(!socketLatch.set(s)) {
if(LOG.isLoggable(INFO))
LOG.info("Closing redundant connection");
s.close();
@@ -342,12 +320,12 @@ class LanTcpPlugin extends TcpPlugin {
private class TcpListenerThread extends Thread {
private final SocketReceiver receiver;
private final LatchedReference<Socket> socketLatch;
private final ServerSocket serverSocket;
private TcpListenerThread(SocketReceiver receiver,
private TcpListenerThread(LatchedReference<Socket> socketLatch,
ServerSocket serverSocket) {
this.receiver = receiver;
this.socketLatch = socketLatch;
this.serverSocket = serverSocket;
}
@@ -359,7 +337,7 @@ class LanTcpPlugin extends TcpPlugin {
try {
Socket s = serverSocket.accept();
if(LOG.isLoggable(INFO)) LOG.info("Incoming connection");
if(!receiver.setSocket(s)) {
if(!socketLatch.set(s)) {
if(LOG.isLoggable(INFO))
LOG.info("Closing redundant connection");
s.close();

View File

@@ -0,0 +1,29 @@
package net.sf.briar.util;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class LatchedReference<T> {
private final CountDownLatch latch = new CountDownLatch(1);
private final AtomicReference<T> reference = new AtomicReference<T>();
public boolean isSet() {
return reference.get() != null;
}
public boolean set(T t) {
if(t == null) throw new IllegalArgumentException();
if(reference.compareAndSet(null, t)) {
latch.countDown();
return true;
}
return false;
}
public T waitForReference(long timeout) throws InterruptedException {
latch.await(timeout, TimeUnit.MILLISECONDS);
return reference.get();
}
}