Let the plugin determine whether to flush the output stream after each

packet.
This commit is contained in:
akwizgran
2011-12-08 22:13:35 +00:00
parent 844ae8f0a7
commit 2494ff1a1e
28 changed files with 223 additions and 159 deletions

View File

@@ -10,19 +10,19 @@ public abstract class AbstractPlugin implements Plugin {
protected final Executor pluginExecutor;
protected boolean started = false; // Locking: this
protected boolean running = false; // Locking: this
protected AbstractPlugin(@PluginExecutor Executor pluginExecutor) {
this.pluginExecutor = pluginExecutor;
}
public synchronized void start() throws IOException {
if(started) throw new IllegalStateException();
started = true;
if(running) throw new IllegalStateException();
running = true;
}
public synchronized void stop() throws IOException {
if(!started) throw new IllegalStateException();
started = false;
if(!running) throw new IllegalStateException();
running = false;
}
}

View File

@@ -78,30 +78,17 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin {
}
throw new IOException(e.toString());
}
pluginExecutor.execute(createContactSocketBinder());
}
@Override
public synchronized void stop() throws IOException {
super.stop();
if(socket != null) {
socket.close();
socket = null;
}
}
private Runnable createContactSocketBinder() {
return new Runnable() {
pluginExecutor.execute(new Runnable() {
public void run() {
bindContactSocket();
}
};
});
}
private void bindContactSocket() {
String uuid;
synchronized(this) {
if(!started) return;
if(!running) return;
uuid = getUuid();
makeDeviceDiscoverable();
}
@@ -115,7 +102,7 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin {
return;
}
synchronized(this) {
if(!started) {
if(!running) {
try {
scn.close();
} catch(IOException e) {
@@ -134,7 +121,7 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin {
}
private synchronized String getUuid() {
assert started;
assert running;
TransportProperties p = callback.getLocalProperties();
String uuid = p.get("uuid");
if(uuid == null) {
@@ -149,7 +136,7 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin {
}
private synchronized void makeDeviceDiscoverable() {
assert started;
assert running;
// Try to make the device discoverable (requires root on Linux)
try {
localDevice.setDiscoverable(DiscoveryAgent.GIAC);
@@ -169,7 +156,7 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin {
StreamConnectionNotifier scn;
StreamConnection s;
synchronized(this) {
if(!started) return;
if(!running) return;
scn = socket;
}
try {
@@ -185,6 +172,15 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin {
}
}
@Override
public synchronized void stop() throws IOException {
super.stop();
if(socket != null) {
socket.close();
socket = null;
}
}
public boolean shouldPoll() {
return true;
}
@@ -193,21 +189,24 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin {
return pollingInterval;
}
public synchronized void poll() {
if(!started) return;
pluginExecutor.execute(createConnectors());
}
private Runnable createConnectors() {
return new Runnable() {
public void poll() {
synchronized(this) {
if(!running) return;
}
pluginExecutor.execute(new Runnable() {
public void run() {
connectAndCallBack();
}
};
});
}
private void connectAndCallBack() {
Map<ContactId, String> discovered = discoverContactUrls();
Map<ContactId, TransportProperties> remote;
synchronized(this) {
if(!running) return;
remote = callback.getRemoteProperties();
}
Map<ContactId, String> discovered = discoverContactUrls(remote);
for(Entry<ContactId, String> e : discovered.entrySet()) {
ContactId c = e.getKey();
String url = e.getValue();
@@ -216,13 +215,12 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin {
}
}
private Map<ContactId, String> discoverContactUrls() {
private Map<ContactId, String> discoverContactUrls(
Map<ContactId, TransportProperties> remote) {
DiscoveryAgent discoveryAgent;
Map<ContactId, TransportProperties> remote;
synchronized(this) {
if(!started) return Collections.emptyMap();
if(!running) return Collections.emptyMap();
discoveryAgent = localDevice.getDiscoveryAgent();
remote = callback.getRemoteProperties();
}
Map<String, ContactId> addresses = new HashMap<String, ContactId>();
Map<ContactId, String> uuids = new HashMap<ContactId, String>();
@@ -236,18 +234,17 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin {
uuids.put(c, uuid);
}
}
if(addresses.isEmpty()) return Collections.emptyMap();
ContactListener listener = new ContactListener(discoveryAgent,
Collections.unmodifiableMap(addresses),
Collections.unmodifiableMap(uuids));
synchronized(discoveryLock) {
try {
discoveryAgent.startInquiry(DiscoveryAgent.GIAC, listener);
return listener.waitForUrls();
} catch(BluetoothStateException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString());
return Collections.emptyMap();
}
try {
return listener.waitForUrls();
} catch(InterruptedException e) {
if(LOG.isLoggable(Level.INFO))
LOG.info("Interrupted while waiting for URLs");
@@ -259,12 +256,10 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin {
private StreamTransportConnection connect(ContactId c, String url) {
synchronized(this) {
if(!started) return null;
if(!running) return null;
}
try {
if(LOG.isLoggable(Level.INFO)) LOG.info("Connecting to " + url);
StreamConnection s = (StreamConnection) Connector.open(url);
if(LOG.isLoggable(Level.INFO)) LOG.info("Connected");
return new BluetoothTransportConnection(s);
} catch(IOException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString());
@@ -273,7 +268,14 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin {
}
public StreamTransportConnection createConnection(ContactId c) {
String url = discoverContactUrls().get(c);
Map<ContactId, TransportProperties> remote;
synchronized(this) {
if(!running) return null;
remote = callback.getRemoteProperties();
}
if(!remote.containsKey(c)) return null;
remote = Collections.singletonMap(c, remote.get(c));
String url = discoverContactUrls(remote).get(c);
return url == null ? null : connect(c, url);
}
@@ -325,7 +327,7 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin {
private void createInvitationConnection(ConnectionCallback c) {
DiscoveryAgent discoveryAgent;
synchronized(this) {
if(!started) return;
if(!running) return;
discoveryAgent = localDevice.getDiscoveryAgent();
}
// Try to discover the other party until the invitation times out
@@ -350,7 +352,7 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin {
}
}
synchronized(this) {
if(!started) return;
if(!running) return;
}
}
if(url == null) return;
@@ -365,7 +367,7 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin {
private void bindInvitationSocket(final ConnectionCallback c) {
synchronized(this) {
if(!started) return;
if(!running) return;
makeDeviceDiscoverable();
}
// Bind the socket
@@ -400,7 +402,7 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin {
private void acceptInvitationConnection(ConnectionCallback c,
StreamConnectionNotifier scn) {
synchronized(this) {
if(!started) return;
if(!running) return;
}
try {
StreamConnection s = scn.acceptAndOpen();

View File

@@ -29,6 +29,10 @@ class BluetoothTransportConnection implements StreamTransportConnection {
return stream.openOutputStream();
}
public boolean shouldFlush() {
return true;
}
public void dispose(boolean exception, boolean recognised) {
try {
stream.close();

View File

@@ -64,7 +64,7 @@ abstract class FilePlugin extends AbstractPlugin implements BatchPlugin {
private BatchTransportWriter createWriter(String filename) {
synchronized(this) {
if(!started) return null;
if(!running) return null;
}
File dir = chooseOutputDirectory();
if(dir == null || !dir.exists() || !dir.isDirectory()) return null;
@@ -86,7 +86,7 @@ abstract class FilePlugin extends AbstractPlugin implements BatchPlugin {
}
protected synchronized void createReaderFromFile(final File f) {
if(!started) return;
if(!running) return;
pluginExecutor.execute(new ReaderCreator(f));
}

View File

@@ -34,6 +34,10 @@ class FileTransportWriter implements BatchTransportWriter {
return out;
}
public boolean shouldFlush() {
return false;
}
public void dispose(boolean exception) {
try {
out.close();

View File

@@ -58,7 +58,7 @@ implements RemovableDriveMonitor.Callback {
}
public long getPollingInterval() {
return 0L;
throw new UnsupportedOperationException();
}
public void poll() {

View File

@@ -53,19 +53,20 @@ class SimpleSocketPlugin extends SocketPlugin {
@Override
protected Socket createClientSocket() throws IOException {
assert started;
assert running;
return new Socket();
}
@Override
protected ServerSocket createServerSocket() throws IOException {
assert started;
assert running;
return new ServerSocket();
}
// Locking: this
@Override
protected synchronized SocketAddress getLocalSocketAddress() {
assert started;
protected SocketAddress getLocalSocketAddress() {
assert running;
SocketAddress addr = createSocketAddress(callback.getLocalProperties());
if(addr == null) {
try {
@@ -87,9 +88,10 @@ class SimpleSocketPlugin extends SocketPlugin {
boolean link = addr.isLinkLocalAddress();
boolean site = addr.isSiteLocalAddress();
if(lan == (link || site)) {
if(LOG.isLoggable(Level.INFO))
if(LOG.isLoggable(Level.INFO)) {
LOG.info("Choosing interface "
+ addr.getHostAddress());
}
return addr;
}
}
@@ -99,9 +101,10 @@ class SimpleSocketPlugin extends SocketPlugin {
for(NetworkInterface iface : ifaces) {
for(InetAddress addr : Collections.list(iface.getInetAddresses())) {
if(!addr.isLoopbackAddress()) {
if(LOG.isLoggable(Level.INFO))
if(LOG.isLoggable(Level.INFO)) {
LOG.info("Accepting interface "
+ addr.getHostAddress());
}
return addr;
}
}
@@ -109,16 +112,17 @@ class SimpleSocketPlugin extends SocketPlugin {
throw new IOException("No suitable interfaces");
}
// Locking: this
@Override
protected synchronized SocketAddress getRemoteSocketAddress(ContactId c) {
assert started;
protected SocketAddress getRemoteSocketAddress(ContactId c) {
assert running;
TransportProperties p = callback.getRemoteProperties().get(c);
return p == null ? null : createSocketAddress(p);
}
private synchronized SocketAddress createSocketAddress(
TransportProperties p) {
assert started;
// Locking: this
private SocketAddress createSocketAddress(TransportProperties p) {
assert running;
assert p != null;
String host = p.get("external");
if(host == null) host = p.get("internal");
@@ -133,9 +137,10 @@ class SimpleSocketPlugin extends SocketPlugin {
return new InetSocketAddress(host, port);
}
// Locking: this
@Override
protected synchronized void setLocalSocketAddress(SocketAddress s) {
assert started;
protected void setLocalSocketAddress(SocketAddress s) {
assert running;
if(!(s instanceof InetSocketAddress))
throw new IllegalArgumentException();
InetSocketAddress i = (InetSocketAddress) s;

View File

@@ -4,11 +4,13 @@ import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.sf.briar.api.ContactId;
import net.sf.briar.api.TransportProperties;
import net.sf.briar.api.plugins.PluginExecutor;
import net.sf.briar.api.plugins.StreamPlugin;
import net.sf.briar.api.plugins.StreamPluginCallback;
@@ -54,7 +56,7 @@ abstract class SocketPlugin extends AbstractPlugin implements StreamPlugin {
ServerSocket ss = null;
try {
synchronized(this) {
if(!started) return;
if(!running) return;
addr = getLocalSocketAddress();
ss = createServerSocket();
if(addr == null || ss == null) return;
@@ -77,7 +79,7 @@ abstract class SocketPlugin extends AbstractPlugin implements StreamPlugin {
return;
}
synchronized(this) {
if(!started) {
if(!running) {
try {
ss.close();
} catch(IOException e) {
@@ -93,7 +95,7 @@ abstract class SocketPlugin extends AbstractPlugin implements StreamPlugin {
while(true) {
Socket s;
synchronized(this) {
if(!started) return;
if(!running) return;
}
try {
s = ss.accept();
@@ -119,21 +121,19 @@ abstract class SocketPlugin extends AbstractPlugin implements StreamPlugin {
if(socket != null) socket.close();
}
public synchronized void poll() {
// Subclasses may not support polling
if(!shouldPoll()) throw new UnsupportedOperationException();
if(!started) return;
for(ContactId c : callback.getRemoteProperties().keySet()) {
pluginExecutor.execute(createConnector(c));
public void poll() {
Map<ContactId, TransportProperties> remote;
synchronized(this) {
if(!running) return;
remote = callback.getRemoteProperties();
}
for(final ContactId c : remote.keySet()) {
pluginExecutor.execute(new Runnable() {
public void run() {
connectAndCallBack(c);
}
});
}
}
private Runnable createConnector(final ContactId c) {
return new Runnable() {
public void run() {
connectAndCallBack(c);
}
};
}
private void connectAndCallBack(ContactId c) {
@@ -146,7 +146,7 @@ abstract class SocketPlugin extends AbstractPlugin implements StreamPlugin {
Socket s;
try {
synchronized(this) {
if(!started) return null;
if(!running) return null;
addr = getRemoteSocketAddress(c);
s = createClientSocket();
if(addr == null || s == null) return null;

View File

@@ -28,6 +28,10 @@ class SocketTransportConnection implements StreamTransportConnection {
return socket.getOutputStream();
}
public boolean shouldFlush() {
return true;
}
public void dispose(boolean exception, boolean recognised) {
try {
socket.close();

View File

@@ -21,7 +21,8 @@ class ProtocolWriterFactoryImpl implements ProtocolWriterFactory {
this.writerFactory = writerFactory;
}
public ProtocolWriter createProtocolWriter(OutputStream out) {
return new ProtocolWriterImpl(serial, writerFactory, out);
public ProtocolWriter createProtocolWriter(OutputStream out,
boolean flush) {
return new ProtocolWriterImpl(serial, writerFactory, out, flush);
}
}

View File

@@ -27,12 +27,14 @@ class ProtocolWriterImpl implements ProtocolWriter {
private final SerialComponent serial;
private final OutputStream out;
private final boolean flush;
private final Writer w;
ProtocolWriterImpl(SerialComponent serial, WriterFactory writerFactory,
OutputStream out) {
OutputStream out, boolean flush) {
this.serial = serial;
this.out = out;
this.flush = flush;
w = writerFactory.createWriter(out);
}
@@ -67,6 +69,7 @@ class ProtocolWriterImpl implements ProtocolWriter {
w.writeListStart();
for(BatchId b : a.getBatchIds()) w.writeBytes(b.getBytes());
w.writeListEnd();
if(flush) out.flush();
}
public void writeBatch(RawBatch b) throws IOException {
@@ -74,6 +77,7 @@ class ProtocolWriterImpl implements ProtocolWriter {
w.writeListStart();
for(byte[] raw : b.getMessages()) out.write(raw);
w.writeListEnd();
if(flush) out.flush();
}
public void writeOffer(Offer o) throws IOException {
@@ -81,6 +85,7 @@ class ProtocolWriterImpl implements ProtocolWriter {
w.writeListStart();
for(MessageId m : o.getMessageIds()) w.writeBytes(m.getBytes());
w.writeListEnd();
if(flush) out.flush();
}
public void writeRequest(Request r) throws IOException {
@@ -100,6 +105,7 @@ class ProtocolWriterImpl implements ProtocolWriter {
w.writeStructId(Types.REQUEST);
w.writeUint7((byte) (bytes * 8 - length));
w.writeBytes(bitmap);
if(flush) out.flush();
}
public void writeSubscriptionUpdate(SubscriptionUpdate s)
@@ -112,6 +118,7 @@ class ProtocolWriterImpl implements ProtocolWriter {
}
w.writeMapEnd();
w.writeInt64(s.getTimestamp());
if(flush) out.flush();
}
private void writeGroup(Writer w, Group g) throws IOException {
@@ -133,5 +140,10 @@ class ProtocolWriterImpl implements ProtocolWriter {
}
w.writeListEnd();
w.writeInt64(t.getTimestamp());
if(flush) out.flush();
}
public void flush() throws IOException {
out.flush();
}
}

View File

@@ -55,7 +55,8 @@ class OutgoingBatchConnection {
transport.getOutputStream(), transport.getCapacity(),
ctx.getSecret());
OutputStream out = conn.getOutputStream();
ProtocolWriter writer = protoFactory.createProtocolWriter(out);
ProtocolWriter writer = protoFactory.createProtocolWriter(out,
transport.shouldFlush());
// There should be enough space for a packet
long capacity = conn.getRemainingCapacity();
if(capacity < MAX_PACKET_LENGTH) throw new EOFException();
@@ -88,8 +89,7 @@ class OutgoingBatchConnection {
capacity = writer.getMessageCapacityForBatch(capacity);
b = db.generateBatch(contactId, (int) capacity);
}
// Flush the output stream
out.flush();
writer.flush();
transport.dispose(false);
} catch(DbException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString());

View File

@@ -26,11 +26,11 @@ class IncomingStreamConnection extends StreamConnection {
ConnectionWriterFactory connWriterFactory,
ProtocolReaderFactory protoReaderFactory,
ProtocolWriterFactory protoWriterFactory,
ConnectionContext ctx, StreamTransportConnection connection,
ConnectionContext ctx, StreamTransportConnection transport,
byte[] tag) {
super(dbExecutor, verificationExecutor, db, connReaderFactory,
connWriterFactory, protoReaderFactory, protoWriterFactory,
ctx.getContactId(), connection);
ctx.getContactId(), transport);
this.ctx = ctx;
this.tag = tag;
}

View File

@@ -31,10 +31,10 @@ class OutgoingStreamConnection extends StreamConnection {
ProtocolReaderFactory protoReaderFactory,
ProtocolWriterFactory protoWriterFactory, ContactId contactId,
TransportIndex transportIndex,
StreamTransportConnection connection) {
StreamTransportConnection transport) {
super(dbExecutor, verificationExecutor, db, connReaderFactory,
connWriterFactory, protoReaderFactory, protoWriterFactory,
contactId, connection);
contactId, transport);
this.transportIndex = transportIndex;
}

View File

@@ -191,7 +191,8 @@ abstract class StreamConnection implements DatabaseListener {
try {
db.addListener(this);
OutputStream out = createConnectionWriter().getOutputStream();
writer = protoWriterFactory.createProtocolWriter(out);
writer = protoWriterFactory.createProtocolWriter(out,
transport.shouldFlush());
// Send the initial packets: transports, subs, acks, offer
dbExecutor.execute(new GenerateTransportUpdate());
dbExecutor.execute(new GenerateSubscriptionUpdate());
@@ -203,6 +204,7 @@ abstract class StreamConnection implements DatabaseListener {
if(task == CLOSE) break;
task.run();
}
writer.flush();
if(!disposed.getAndSet(true)) transport.dispose(false, true);
} catch(DbException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString());

View File

@@ -77,53 +77,58 @@ class ConnectionDispatcherImpl implements ConnectionDispatcher {
private class DispatchBatchConnection implements Runnable {
private final TransportId t;
private final BatchTransportReader r;
private final TransportId transportId;
private final BatchTransportReader transport;
private DispatchBatchConnection(TransportId t, BatchTransportReader r) {
this.t = t;
this.r = r;
private DispatchBatchConnection(TransportId transportId,
BatchTransportReader transport) {
this.transportId = transportId;
this.transport = transport;
}
public void run() {
try {
byte[] tag = readTag(r.getInputStream());
ConnectionContext ctx = recogniser.acceptConnection(t, tag);
if(ctx == null) r.dispose(false, false);
else batchConnFactory.createIncomingConnection(ctx, r, tag);
byte[] tag = readTag(transport.getInputStream());
ConnectionContext ctx = recogniser.acceptConnection(transportId,
tag);
if(ctx == null) transport.dispose(false, false);
else batchConnFactory.createIncomingConnection(ctx, transport,
tag);
} catch(DbException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString());
r.dispose(true, false);
transport.dispose(true, false);
} catch(IOException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString());
r.dispose(true, false);
transport.dispose(true, false);
}
}
}
private class DispatchStreamConnection implements Runnable {
private final TransportId t;
private final StreamTransportConnection s;
private final TransportId transportId;
private final StreamTransportConnection transport;
private DispatchStreamConnection(TransportId t,
StreamTransportConnection s) {
this.t = t;
this.s = s;
private DispatchStreamConnection(TransportId transportId,
StreamTransportConnection transport) {
this.transportId = transportId;
this.transport = transport;
}
public void run() {
try {
byte[] tag = readTag(s.getInputStream());
ConnectionContext ctx = recogniser.acceptConnection(t, tag);
if(ctx == null) s.dispose(false, false);
else streamConnFactory.createIncomingConnection(ctx, s, tag);
byte[] tag = readTag(transport.getInputStream());
ConnectionContext ctx = recogniser.acceptConnection(transportId,
tag);
if(ctx == null) transport.dispose(false, false);
else streamConnFactory.createIncomingConnection(ctx, transport,
tag);
} catch(DbException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString());
s.dispose(true, false);
transport.dispose(true, false);
} catch(IOException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.toString());
s.dispose(true, false);
transport.dispose(true, false);
}
}
}