Files
briar/components/net/sf/briar/transport/stream/StreamConnection.java
2011-12-05 23:16:27 +00:00

473 lines
15 KiB
Java

package net.sf.briar.transport.stream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.sf.briar.api.ContactId;
import net.sf.briar.api.FormatException;
import net.sf.briar.api.db.DatabaseComponent;
import net.sf.briar.api.db.DbException;
import net.sf.briar.api.db.event.BatchReceivedEvent;
import net.sf.briar.api.db.event.ContactRemovedEvent;
import net.sf.briar.api.db.event.DatabaseEvent;
import net.sf.briar.api.db.event.DatabaseListener;
import net.sf.briar.api.db.event.LocalTransportsUpdatedEvent;
import net.sf.briar.api.db.event.MessagesAddedEvent;
import net.sf.briar.api.db.event.SubscriptionsUpdatedEvent;
import net.sf.briar.api.protocol.Ack;
import net.sf.briar.api.protocol.MessageId;
import net.sf.briar.api.protocol.Offer;
import net.sf.briar.api.protocol.ProtocolReader;
import net.sf.briar.api.protocol.ProtocolReaderFactory;
import net.sf.briar.api.protocol.Request;
import net.sf.briar.api.protocol.SubscriptionUpdate;
import net.sf.briar.api.protocol.TransportUpdate;
import net.sf.briar.api.protocol.UnverifiedBatch;
import net.sf.briar.api.protocol.writers.AckWriter;
import net.sf.briar.api.protocol.writers.BatchWriter;
import net.sf.briar.api.protocol.writers.OfferWriter;
import net.sf.briar.api.protocol.writers.ProtocolWriterFactory;
import net.sf.briar.api.protocol.writers.RequestWriter;
import net.sf.briar.api.protocol.writers.SubscriptionUpdateWriter;
import net.sf.briar.api.protocol.writers.TransportUpdateWriter;
import net.sf.briar.api.transport.ConnectionReader;
import net.sf.briar.api.transport.ConnectionReaderFactory;
import net.sf.briar.api.transport.ConnectionWriter;
import net.sf.briar.api.transport.ConnectionWriterFactory;
import net.sf.briar.api.transport.StreamTransportConnection;
abstract class StreamConnection implements DatabaseListener {
private static final int MAX_WAITING_DB_WRITES = 5;
private static enum State { SEND_OFFER, IDLE, AWAIT_REQUEST, SEND_BATCHES };
private static final Logger LOG =
Logger.getLogger(StreamConnection.class.getName());
protected final Executor executor;
protected final ConnectionReaderFactory connReaderFactory;
protected final ConnectionWriterFactory connWriterFactory;
protected final DatabaseComponent db;
protected final ProtocolReaderFactory protoReaderFactory;
protected final ProtocolWriterFactory protoWriterFactory;
protected final ContactId contactId;
protected final StreamTransportConnection connection;
private final Semaphore semaphore;
private int writerFlags = 0; // Locking: this
private Collection<MessageId> offered = null; // Locking: this
private LinkedList<MessageId> requested = null; // Locking: this
private Offer incomingOffer = null; // Locking: this
StreamConnection(Executor executor,
ConnectionReaderFactory connReaderFactory,
ConnectionWriterFactory connWriterFactory, DatabaseComponent db,
ProtocolReaderFactory protoReaderFactory,
ProtocolWriterFactory protoWriterFactory, ContactId contactId,
StreamTransportConnection connection) {
this.executor = executor;
this.connReaderFactory = connReaderFactory;
this.connWriterFactory = connWriterFactory;
this.db = db;
this.protoReaderFactory = protoReaderFactory;
this.protoWriterFactory = protoWriterFactory;
this.contactId = contactId;
this.connection = connection;
semaphore = new Semaphore(MAX_WAITING_DB_WRITES);
}
protected abstract ConnectionReader createConnectionReader()
throws DbException, IOException;
protected abstract ConnectionWriter createConnectionWriter()
throws DbException, IOException ;
public void eventOccurred(DatabaseEvent e) {
synchronized(this) {
if(e instanceof BatchReceivedEvent) {
writerFlags |= Flags.BATCH_RECEIVED;
notifyAll();
} else if(e instanceof ContactRemovedEvent) {
ContactId c = ((ContactRemovedEvent) e).getContactId();
if(contactId.equals(c)) {
writerFlags |= Flags.CONTACT_REMOVED;
notifyAll();
}
} else if(e instanceof MessagesAddedEvent) {
writerFlags |= Flags.MESSAGES_ADDED;
notifyAll();
} else if(e instanceof SubscriptionsUpdatedEvent) {
Collection<ContactId> affected =
((SubscriptionsUpdatedEvent) e).getAffectedContacts();
if(affected.contains(contactId)) {
writerFlags |= Flags.SUBSCRIPTIONS_UPDATED;
notifyAll();
}
} else if(e instanceof LocalTransportsUpdatedEvent) {
writerFlags |= Flags.TRANSPORTS_UPDATED;
notifyAll();
}
}
}
void read() {
try {
InputStream in = createConnectionReader().getInputStream();
ProtocolReader proto = protoReaderFactory.createProtocolReader(in);
while(!proto.eof()) {
if(proto.hasAck()) {
final Ack a = proto.readAck();
// Store the ack on another thread
semaphore.acquire();
executor.execute(new Runnable() {
public void run() {
try {
db.receiveAck(contactId, a);
} catch(DbException e) {
if(LOG.isLoggable(Level.WARNING))
LOG.warning(e.getMessage());
}
semaphore.release();
}
});
} else if(proto.hasBatch()) {
final UnverifiedBatch b = proto.readBatch();
// Verify and store the batch on another thread
semaphore.acquire();
executor.execute(new Runnable() {
public void run() {
try {
db.receiveBatch(contactId, b.verify());
} catch(DbException e) {
if(LOG.isLoggable(Level.WARNING))
LOG.warning(e.getMessage());
} catch(GeneralSecurityException e) {
if(LOG.isLoggable(Level.WARNING))
LOG.warning(e.getMessage());
}
semaphore.release();
}
});
} else if(proto.hasOffer()) {
Offer o = proto.readOffer();
// Store the incoming offer and notify the writer
synchronized(this) {
writerFlags |= Flags.OFFER_RECEIVED;
incomingOffer = o;
notifyAll();
}
} else if(proto.hasRequest()) {
Request r = proto.readRequest();
// Retrieve the offered message IDs
Collection<MessageId> off;
synchronized(this) {
if(offered == null)
throw new IOException("Unexpected request packet");
off = offered;
offered = null;
}
// Work out which messages were requested
BitSet b = r.getBitmap();
LinkedList<MessageId> req = new LinkedList<MessageId>();
List<MessageId> seen = new ArrayList<MessageId>();
int i = 0;
for(MessageId m : off) {
if(b.get(i++)) req.add(m);
else seen.add(m);
}
// Mark the unrequested messages as seen on another thread
final List<MessageId> l =
Collections.unmodifiableList(seen);
semaphore.acquire();
executor.execute(new Runnable() {
public void run() {
try {
db.setSeen(contactId, l);
} catch(DbException e) {
if(LOG.isLoggable(Level.WARNING))
LOG.warning(e.getMessage());
}
semaphore.release();
}
});
// Store the requested message IDs and notify the writer
synchronized(this) {
if(requested != null)
throw new IOException("Unexpected request packet");
requested = req;
writerFlags |= Flags.REQUEST_RECEIVED;
notifyAll();
}
} else if(proto.hasSubscriptionUpdate()) {
final SubscriptionUpdate s = proto.readSubscriptionUpdate();
// Store the update on another thread
semaphore.acquire();
executor.execute(new Runnable() {
public void run() {
try {
db.receiveSubscriptionUpdate(contactId, s);
} catch(DbException e) {
if(LOG.isLoggable(Level.WARNING))
LOG.warning(e.getMessage());
}
semaphore.release();
}
});
} else if(proto.hasTransportUpdate()) {
final TransportUpdate t = proto.readTransportUpdate();
// Store the update on another thread
semaphore.acquire();
executor.execute(new Runnable() {
public void run() {
try {
db.receiveTransportUpdate(contactId, t);
} catch(DbException e) {
if(LOG.isLoggable(Level.WARNING))
LOG.warning(e.getMessage());
}
semaphore.release();
}
});
} else {
throw new FormatException();
}
}
} catch(DbException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
connection.dispose(false);
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
} catch(IOException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
connection.dispose(false);
}
// Success
connection.dispose(true);
}
void write() {
try {
OutputStream out = createConnectionWriter().getOutputStream();
// Create the packet writers
AckWriter ackWriter = protoWriterFactory.createAckWriter(out);
BatchWriter batchWriter = protoWriterFactory.createBatchWriter(out);
OfferWriter offerWriter = protoWriterFactory.createOfferWriter(out);
RequestWriter requestWriter =
protoWriterFactory.createRequestWriter(out);
SubscriptionUpdateWriter subscriptionUpdateWriter =
protoWriterFactory.createSubscriptionUpdateWriter(out);
TransportUpdateWriter transportUpdateWriter =
protoWriterFactory.createTransportUpdateWriter(out);
// Send the initial packets: transports, subs, any waiting acks
sendTransportUpdate(transportUpdateWriter);
sendSubscriptionUpdate(subscriptionUpdateWriter);
sendAcks(ackWriter);
State state = State.SEND_OFFER;
// Main loop
while(true) {
int flags = 0;
switch(state) {
case SEND_OFFER:
// Try to send an offer
if(sendOffer(offerWriter)) state = State.AWAIT_REQUEST;
else state = State.IDLE;
break;
case IDLE:
// Wait for one or more flags to be raised
synchronized(this) {
while(writerFlags == 0) {
try {
wait();
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
}
flags = writerFlags;
writerFlags = 0;
}
// Handle the flags in approximate order of urgency
if((flags & Flags.CONTACT_REMOVED) != 0) {
connection.dispose(true);
return;
}
if((flags & Flags.TRANSPORTS_UPDATED) != 0) {
sendTransportUpdate(transportUpdateWriter);
}
if((flags & Flags.SUBSCRIPTIONS_UPDATED) != 0) {
sendSubscriptionUpdate(subscriptionUpdateWriter);
}
if((flags & Flags.BATCH_RECEIVED) != 0) {
sendAcks(ackWriter);
}
if((flags & Flags.OFFER_RECEIVED) != 0) {
sendRequest(requestWriter);
}
if((flags & Flags.REQUEST_RECEIVED) != 0) {
// Should only be received in state AWAIT_REQUEST
throw new IOException("Unexpected request packet");
}
if((flags & Flags.MESSAGES_ADDED) != 0) {
state = State.SEND_OFFER;
}
break;
case AWAIT_REQUEST:
// Wait for one or more flags to be raised
synchronized(this) {
while(writerFlags == 0) {
try {
wait();
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
}
flags = writerFlags;
writerFlags = 0;
}
// Handle the flags in approximate order of urgency
if((flags & Flags.CONTACT_REMOVED) != 0) {
connection.dispose(true);
return;
}
if((flags & Flags.TRANSPORTS_UPDATED) != 0) {
sendTransportUpdate(transportUpdateWriter);
}
if((flags & Flags.SUBSCRIPTIONS_UPDATED) != 0) {
sendSubscriptionUpdate(subscriptionUpdateWriter);
}
if((flags & Flags.BATCH_RECEIVED) != 0) {
sendAcks(ackWriter);
}
if((flags & Flags.OFFER_RECEIVED) != 0) {
sendRequest(requestWriter);
}
if((flags & Flags.REQUEST_RECEIVED) != 0) {
state = State.SEND_BATCHES;
}
if((flags & Flags.MESSAGES_ADDED) != 0) {
// Ignored in this state
}
break;
case SEND_BATCHES:
// Check whether any flags have been raised
synchronized(this) {
flags = writerFlags;
writerFlags = 0;
}
// Handle the flags in approximate order of urgency
if((flags & Flags.CONTACT_REMOVED) != 0) {
connection.dispose(true);
return;
}
if((flags & Flags.TRANSPORTS_UPDATED) != 0) {
sendTransportUpdate(transportUpdateWriter);
}
if((flags & Flags.SUBSCRIPTIONS_UPDATED) != 0) {
sendSubscriptionUpdate(subscriptionUpdateWriter);
}
if((flags & Flags.BATCH_RECEIVED) != 0) {
sendAcks(ackWriter);
}
if((flags & Flags.OFFER_RECEIVED) != 0) {
sendRequest(requestWriter);
}
if((flags & Flags.REQUEST_RECEIVED) != 0) {
// Should only be received in state AWAIT_REQUEST
throw new IOException("Unexpected request packet");
}
if((flags & Flags.MESSAGES_ADDED) != 0) {
// Ignored in this state
}
// Try to send a batch
if(!sendBatch(batchWriter)) state = State.SEND_OFFER;
break;
}
}
} catch(DbException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
connection.dispose(false);
} catch(IOException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
connection.dispose(false);
}
// Success
connection.dispose(true);
}
private void sendAcks(AckWriter a) throws DbException, IOException {
while(db.generateAck(contactId, a));
}
private boolean sendBatch(BatchWriter b) throws DbException, IOException {
Collection<MessageId> req;
// Retrieve the requested message IDs
synchronized(this) {
assert offered == null;
assert requested != null;
req = requested;
}
// Try to generate a batch, updating the collection of message IDs
boolean anyAdded = db.generateBatch(contactId, b, req);
// If no more batches can be generated, discard the remaining IDs
if(!anyAdded) {
synchronized(this) {
assert offered == null;
assert requested == req;
requested = null;
}
}
return anyAdded;
}
private boolean sendOffer(OfferWriter o) throws DbException, IOException {
// Generate an offer
Collection<MessageId> off = db.generateOffer(contactId, o);
// Store the offered message IDs
synchronized(this) {
assert offered == null;
assert requested == null;
offered = off;
}
return !off.isEmpty();
}
private void sendRequest(RequestWriter r) throws DbException, IOException {
Offer o;
// Retrieve the incoming offer
synchronized(this) {
assert incomingOffer != null;
o = incomingOffer;
incomingOffer = null;
}
// Process the offer and generate a request
db.receiveOffer(contactId, o, r);
}
private void sendTransportUpdate(TransportUpdateWriter t)
throws DbException, IOException {
db.generateTransportUpdate(contactId, t);
}
private void sendSubscriptionUpdate(SubscriptionUpdateWriter s)
throws DbException, IOException {
db.generateSubscriptionUpdate(contactId, s);
}
}