Minor code cleanup.

This commit is contained in:
akwizgran
2011-12-07 21:02:18 +00:00
parent 2020f60ebf
commit 22dfe947fa
3 changed files with 24 additions and 33 deletions

View File

@@ -36,13 +36,13 @@ class IncomingStreamConnection extends StreamConnection {
@Override @Override
protected ConnectionReader createConnectionReader() throws IOException { protected ConnectionReader createConnectionReader() throws IOException {
return connReaderFactory.createConnectionReader( return connReaderFactory.createConnectionReader(
connection.getInputStream(), ctx.getSecret(), tag); transport.getInputStream(), ctx.getSecret(), tag);
} }
@Override @Override
protected ConnectionWriter createConnectionWriter() throws IOException { protected ConnectionWriter createConnectionWriter() throws IOException {
return connWriterFactory.createConnectionWriter( return connWriterFactory.createConnectionWriter(
connection.getOutputStream(), Long.MAX_VALUE, ctx.getSecret(), transport.getOutputStream(), Long.MAX_VALUE, ctx.getSecret(),
tag); tag);
} }
} }

View File

@@ -43,7 +43,7 @@ class OutgoingStreamConnection extends StreamConnection {
ctx = db.getConnectionContext(contactId, transportIndex); ctx = db.getConnectionContext(contactId, transportIndex);
} }
return connReaderFactory.createConnectionReader( return connReaderFactory.createConnectionReader(
connection.getInputStream(), ctx.getSecret()); transport.getInputStream(), ctx.getSecret());
} }
@Override @Override
@@ -54,6 +54,6 @@ class OutgoingStreamConnection extends StreamConnection {
ctx = db.getConnectionContext(contactId, transportIndex); ctx = db.getConnectionContext(contactId, transportIndex);
} }
return connWriterFactory.createConnectionWriter( return connWriterFactory.createConnectionWriter(
connection.getOutputStream(), Long.MAX_VALUE, ctx.getSecret()); transport.getOutputStream(), Long.MAX_VALUE, ctx.getSecret());
} }
} }

View File

@@ -50,11 +50,6 @@ abstract class StreamConnection implements DatabaseListener {
private static final Logger LOG = private static final Logger LOG =
Logger.getLogger(StreamConnection.class.getName()); Logger.getLogger(StreamConnection.class.getName());
// A canary indicating that the connection should be closed
private static final Runnable CLOSE_CONNECTION = new Runnable() {
public void run() {}
};
protected final Executor dbExecutor; protected final Executor dbExecutor;
protected final DatabaseComponent db; protected final DatabaseComponent db;
protected final ConnectionReaderFactory connReaderFactory; protected final ConnectionReaderFactory connReaderFactory;
@@ -62,21 +57,22 @@ abstract class StreamConnection implements DatabaseListener {
protected final ProtocolReaderFactory protoReaderFactory; protected final ProtocolReaderFactory protoReaderFactory;
protected final ProtocolWriterFactory protoWriterFactory; protected final ProtocolWriterFactory protoWriterFactory;
protected final ContactId contactId; protected final ContactId contactId;
protected final StreamTransportConnection connection; protected final StreamTransportConnection transport;
private final AtomicBoolean canSendOffer = new AtomicBoolean(false); private final AtomicBoolean canSendOffer;
private final LinkedList<Runnable> writerTasks; // Locking: this private final LinkedList<Runnable> writerTasks; // Locking: this
private Collection<MessageId> offered = null; // Locking: this private Collection<MessageId> offered = null; // Locking: this
private volatile ProtocolWriter writer = null; private volatile ProtocolWriter writer = null;
private volatile boolean closed = false;
StreamConnection(@DatabaseExecutor Executor dbExecutor, StreamConnection(@DatabaseExecutor Executor dbExecutor,
DatabaseComponent db, ConnectionReaderFactory connReaderFactory, DatabaseComponent db, ConnectionReaderFactory connReaderFactory,
ConnectionWriterFactory connWriterFactory, ConnectionWriterFactory connWriterFactory,
ProtocolReaderFactory protoReaderFactory, ProtocolReaderFactory protoReaderFactory,
ProtocolWriterFactory protoWriterFactory, ContactId contactId, ProtocolWriterFactory protoWriterFactory, ContactId contactId,
StreamTransportConnection connection) { StreamTransportConnection transport) {
this.dbExecutor = dbExecutor; this.dbExecutor = dbExecutor;
this.db = db; this.db = db;
this.connReaderFactory = connReaderFactory; this.connReaderFactory = connReaderFactory;
@@ -84,7 +80,8 @@ abstract class StreamConnection implements DatabaseListener {
this.protoReaderFactory = protoReaderFactory; this.protoReaderFactory = protoReaderFactory;
this.protoWriterFactory = protoWriterFactory; this.protoWriterFactory = protoWriterFactory;
this.contactId = contactId; this.contactId = contactId;
this.connection = connection; this.transport = transport;
canSendOffer = new AtomicBoolean(false);
writerTasks = new LinkedList<Runnable>(); writerTasks = new LinkedList<Runnable>();
} }
@@ -99,12 +96,7 @@ abstract class StreamConnection implements DatabaseListener {
dbExecutor.execute(new GenerateAcks()); dbExecutor.execute(new GenerateAcks());
} else if(e instanceof ContactRemovedEvent) { } else if(e instanceof ContactRemovedEvent) {
ContactId c = ((ContactRemovedEvent) e).getContactId(); ContactId c = ((ContactRemovedEvent) e).getContactId();
if(contactId.equals(c)) { if(contactId.equals(c)) closed = true;
synchronized(this) {
writerTasks.add(CLOSE_CONNECTION);
notifyAll();
}
}
} else if(e instanceof MessagesAddedEvent) { } else if(e instanceof MessagesAddedEvent) {
if(canSendOffer.getAndSet(false)) if(canSendOffer.getAndSet(false))
dbExecutor.execute(new GenerateOffer()); dbExecutor.execute(new GenerateOffer());
@@ -162,13 +154,13 @@ abstract class StreamConnection implements DatabaseListener {
throw new FormatException(); throw new FormatException();
} }
} }
connection.dispose(true); transport.dispose(true);
} catch(DbException e) { } catch(DbException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
connection.dispose(false); transport.dispose(false);
} catch(IOException e) { } catch(IOException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
connection.dispose(false); transport.dispose(false);
} }
} }
@@ -198,7 +190,7 @@ abstract class StreamConnection implements DatabaseListener {
dbExecutor.execute(new GenerateAcks()); dbExecutor.execute(new GenerateAcks());
dbExecutor.execute(new GenerateOffer()); dbExecutor.execute(new GenerateOffer());
// Main loop // Main loop
while(true) { while(!closed) {
Runnable task = null; Runnable task = null;
synchronized(this) { synchronized(this) {
while(writerTasks.isEmpty()) { while(writerTasks.isEmpty()) {
@@ -210,16 +202,15 @@ abstract class StreamConnection implements DatabaseListener {
} }
task = writerTasks.poll(); task = writerTasks.poll();
} }
if(task == CLOSE_CONNECTION) break;
task.run(); task.run();
} }
connection.dispose(true); transport.dispose(true);
} catch(DbException e) { } catch(DbException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
connection.dispose(false); transport.dispose(false);
} catch(IOException e) { } catch(IOException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
connection.dispose(false); transport.dispose(false);
} }
} }
@@ -299,7 +290,7 @@ abstract class StreamConnection implements DatabaseListener {
writer.writeRequest(request); writer.writeRequest(request);
} catch(IOException e) { } catch(IOException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
connection.dispose(false); transport.dispose(false);
} }
} }
} }
@@ -394,7 +385,7 @@ abstract class StreamConnection implements DatabaseListener {
writer.writeAck(ack); writer.writeAck(ack);
} catch(IOException e) { } catch(IOException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
connection.dispose(false); transport.dispose(false);
} }
} }
} }
@@ -453,7 +444,7 @@ abstract class StreamConnection implements DatabaseListener {
} }
} catch(IOException e) { } catch(IOException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
connection.dispose(false); transport.dispose(false);
} }
} }
} }
@@ -499,7 +490,7 @@ abstract class StreamConnection implements DatabaseListener {
writer.writeOffer(offer); writer.writeOffer(offer);
} catch(IOException e) { } catch(IOException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
connection.dispose(false); transport.dispose(false);
} }
} }
} }
@@ -537,7 +528,7 @@ abstract class StreamConnection implements DatabaseListener {
writer.writeSubscriptionUpdate(update); writer.writeSubscriptionUpdate(update);
} catch(IOException e) { } catch(IOException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
connection.dispose(false); transport.dispose(false);
} }
} }
} }
@@ -575,7 +566,7 @@ abstract class StreamConnection implements DatabaseListener {
writer.writeTransportUpdate(update); writer.writeTransportUpdate(update);
} catch(IOException e) { } catch(IOException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
connection.dispose(false); transport.dispose(false);
} }
} }
} }