Use a BlockingQueue to simplify synchronisation.

This commit is contained in:
akwizgran
2011-12-07 23:38:43 +00:00
parent c728377ae1
commit b95753bb24

View File

@@ -10,7 +10,9 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@@ -52,6 +54,10 @@ abstract class StreamConnection implements DatabaseListener {
private static final Logger LOG = private static final Logger LOG =
Logger.getLogger(StreamConnection.class.getName()); Logger.getLogger(StreamConnection.class.getName());
private static final Runnable CLOSE = new Runnable() {
public void run() {}
};
protected final DatabaseComponent db; protected final DatabaseComponent db;
protected final ConnectionReaderFactory connReaderFactory; protected final ConnectionReaderFactory connReaderFactory;
protected final ConnectionWriterFactory connWriterFactory; protected final ConnectionWriterFactory connWriterFactory;
@@ -62,12 +68,11 @@ abstract class StreamConnection implements DatabaseListener {
private final Executor dbExecutor, verificationExecutor; private final Executor dbExecutor, verificationExecutor;
private final AtomicBoolean canSendOffer; private final AtomicBoolean canSendOffer;
private final LinkedList<Runnable> writerTasks; // Locking: this private final BlockingQueue<Runnable> writerTasks;
private Collection<MessageId> offered = null; // Locking: this private Collection<MessageId> offered = null; // Locking: this
private volatile ProtocolWriter writer = null; private volatile ProtocolWriter writer = null;
private volatile boolean closed = false;
StreamConnection(@DatabaseExecutor Executor dbExecutor, StreamConnection(@DatabaseExecutor Executor dbExecutor,
@VerificationExecutor Executor verificationExecutor, @VerificationExecutor Executor verificationExecutor,
@@ -86,7 +91,7 @@ abstract class StreamConnection implements DatabaseListener {
this.contactId = contactId; this.contactId = contactId;
this.transport = transport; this.transport = transport;
canSendOffer = new AtomicBoolean(false); canSendOffer = new AtomicBoolean(false);
writerTasks = new LinkedList<Runnable>(); writerTasks = new LinkedBlockingQueue<Runnable>();
} }
protected abstract ConnectionReader createConnectionReader() protected abstract ConnectionReader createConnectionReader()
@@ -100,7 +105,7 @@ abstract class StreamConnection implements DatabaseListener {
dbExecutor.execute(new GenerateAcks()); dbExecutor.execute(new GenerateAcks());
} else if(e instanceof ContactRemovedEvent) { } else if(e instanceof ContactRemovedEvent) {
ContactId c = ((ContactRemovedEvent) e).getContactId(); ContactId c = ((ContactRemovedEvent) e).getContactId();
if(contactId.equals(c)) closed = true; if(contactId.equals(c)) writerTasks.add(CLOSE);
} else if(e instanceof MessagesAddedEvent) { } else if(e instanceof MessagesAddedEvent) {
if(canSendOffer.getAndSet(false)) if(canSendOffer.getAndSet(false))
dbExecutor.execute(new GenerateOffer()); dbExecutor.execute(new GenerateOffer());
@@ -133,6 +138,7 @@ abstract class StreamConnection implements DatabaseListener {
Request r = reader.readRequest(); Request r = reader.readRequest();
// Retrieve the offered message IDs // Retrieve the offered message IDs
Collection<MessageId> offered = getOfferedMessageIds(); Collection<MessageId> offered = getOfferedMessageIds();
if(offered == null) throw new FormatException();
// Work out which messages were requested // Work out which messages were requested
BitSet b = r.getBitmap(); BitSet b = r.getBitmap();
List<MessageId> requested = new LinkedList<MessageId>(); List<MessageId> requested = new LinkedList<MessageId>();
@@ -168,16 +174,13 @@ abstract class StreamConnection implements DatabaseListener {
} }
} }
private synchronized Collection<MessageId> getOfferedMessageIds() private synchronized Collection<MessageId> getOfferedMessageIds() {
throws FormatException {
if(offered == null) throw new FormatException(); // Unexpected request
Collection<MessageId> ids = offered; Collection<MessageId> ids = offered;
offered = null; offered = null;
return ids; return ids;
} }
// Locking: this private synchronized void setOfferedMessageIds(Collection<MessageId> ids) {
private void setOfferedMessageIds(Collection<MessageId> ids) {
assert offered == null; assert offered == null;
offered = ids; offered = ids;
} }
@@ -193,19 +196,14 @@ abstract class StreamConnection implements DatabaseListener {
dbExecutor.execute(new GenerateAcks()); dbExecutor.execute(new GenerateAcks());
dbExecutor.execute(new GenerateOffer()); dbExecutor.execute(new GenerateOffer());
// Main loop // Main loop
while(!closed) { while(true) {
Runnable task = null; try {
synchronized(this) { Runnable task = writerTasks.take();
while(writerTasks.isEmpty()) { if(task == CLOSE) break;
try { task.run();
wait(); } catch(InterruptedException e) {
} catch(InterruptedException e) { Thread.currentThread().interrupt();
Thread.currentThread().interrupt();
}
}
task = writerTasks.poll();
} }
task.run();
} }
transport.dispose(true); transport.dispose(true);
} catch(DbException e) { } catch(DbException e) {
@@ -286,10 +284,7 @@ abstract class StreamConnection implements DatabaseListener {
public void run() { public void run() {
try { try {
Request r = db.receiveOffer(contactId, offer); Request r = db.receiveOffer(contactId, offer);
synchronized(StreamConnection.this) { writerTasks.add(new WriteRequest(r));
writerTasks.add(new WriteRequest(r));
StreamConnection.this.notifyAll();
}
} catch(DbException e) { } catch(DbException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
} }
@@ -379,10 +374,7 @@ abstract class StreamConnection implements DatabaseListener {
try { try {
Ack a = db.generateAck(contactId, maxBatches); Ack a = db.generateAck(contactId, maxBatches);
while(a != null) { while(a != null) {
synchronized(StreamConnection.this) { writerTasks.add(new WriteAck(a));
writerTasks.add(new WriteAck(a));
StreamConnection.this.notifyAll();
}
a = db.generateAck(contactId, maxBatches); a = db.generateAck(contactId, maxBatches);
} }
} catch(DbException e) { } catch(DbException e) {
@@ -425,16 +417,8 @@ abstract class StreamConnection implements DatabaseListener {
int capacity = writer.getMessageCapacityForBatch(Long.MAX_VALUE); int capacity = writer.getMessageCapacityForBatch(Long.MAX_VALUE);
try { try {
RawBatch b = db.generateBatch(contactId, capacity, requested); RawBatch b = db.generateBatch(contactId, capacity, requested);
if(b == null) { if(b == null) new GenerateOffer().run();
// No batch to write - send another offer else writerTasks.add(new WriteBatch(b, requested));
new GenerateOffer().run();
} else {
// Write the batch
synchronized(StreamConnection.this) {
writerTasks.add(new WriteBatch(b, requested));
StreamConnection.this.notifyAll();
}
}
} catch(DbException e) { } catch(DbException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
} }
@@ -456,13 +440,8 @@ abstract class StreamConnection implements DatabaseListener {
assert writer != null; assert writer != null;
try { try {
writer.writeBatch(batch); writer.writeBatch(batch);
if(requested.isEmpty()) { if(requested.isEmpty()) dbExecutor.execute(new GenerateOffer());
// No more batches to send - send another offer else dbExecutor.execute(new GenerateBatch(requested));
dbExecutor.execute(new GenerateOffer());
} else {
// Send another batch
dbExecutor.execute(new GenerateBatch(requested));
}
} catch(IOException e) { } catch(IOException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
transport.dispose(false); transport.dispose(false);
@@ -482,13 +461,10 @@ abstract class StreamConnection implements DatabaseListener {
// No messages to offer - wait for some to be added // No messages to offer - wait for some to be added
canSendOffer.set(true); canSendOffer.set(true);
} else { } else {
synchronized(StreamConnection.this) { // Store the offered message IDs
// Store the offered message IDs setOfferedMessageIds(o.getMessageIds());
setOfferedMessageIds(o.getMessageIds()); // Write the offer on the writer thread
// Write the offer on the writer thread writerTasks.add(new WriteOffer(o));
writerTasks.add(new WriteOffer(o));
StreamConnection.this.notifyAll();
}
} }
} catch(DbException e) { } catch(DbException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
@@ -522,12 +498,7 @@ abstract class StreamConnection implements DatabaseListener {
public void run() { public void run() {
try { try {
SubscriptionUpdate s = db.generateSubscriptionUpdate(contactId); SubscriptionUpdate s = db.generateSubscriptionUpdate(contactId);
if(s != null) { if(s != null) writerTasks.add(new WriteSubscriptionUpdate(s));
synchronized(StreamConnection.this) {
writerTasks.add(new WriteSubscriptionUpdate(s));
StreamConnection.this.notifyAll();
}
}
} catch(DbException e) { } catch(DbException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
} }
@@ -560,12 +531,7 @@ abstract class StreamConnection implements DatabaseListener {
public void run() { public void run() {
try { try {
TransportUpdate t = db.generateTransportUpdate(contactId); TransportUpdate t = db.generateTransportUpdate(contactId);
if(t != null) { if(t != null) writerTasks.add(new WriteTransportUpdate(t));
synchronized(StreamConnection.this) {
writerTasks.add(new WriteTransportUpdate(t));
StreamConnection.this.notifyAll();
}
}
} catch(DbException e) { } catch(DbException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
} }