Cancel database queries if the messaging session has been interrupted.

This commit is contained in:
akwizgran
2014-11-05 18:34:59 +00:00
parent 5b8eab6035
commit dfa4860200
2 changed files with 32 additions and 4 deletions

View File

@@ -165,12 +165,22 @@ class ReactiveOutgoingSession implements MessagingSession, EventListener {
if(((MessageToRequestEvent) e).getContactId().equals(contactId))
dbExecutor.execute(new GenerateRequest());
} else if(e instanceof RemoteRetentionTimeUpdatedEvent) {
dbExecutor.execute(new GenerateRetentionAck());
RemoteRetentionTimeUpdatedEvent r =
(RemoteRetentionTimeUpdatedEvent) e;
if(r.getContactId().equals(contactId))
dbExecutor.execute(new GenerateRetentionAck());
} else if(e instanceof RemoteSubscriptionsUpdatedEvent) {
dbExecutor.execute(new GenerateSubscriptionAck());
dbExecutor.execute(new GenerateOffer());
RemoteSubscriptionsUpdatedEvent r =
(RemoteSubscriptionsUpdatedEvent) e;
if(r.getContactId().equals(contactId)) {
dbExecutor.execute(new GenerateSubscriptionAck());
dbExecutor.execute(new GenerateOffer());
}
} else if(e instanceof RemoteTransportsUpdatedEvent) {
dbExecutor.execute(new GenerateTransportAcks());
RemoteTransportsUpdatedEvent r =
(RemoteTransportsUpdatedEvent) e;
if(r.getContactId().equals(contactId))
dbExecutor.execute(new GenerateTransportAcks());
} else if(e instanceof TransportRemovedEvent) {
TransportRemovedEvent t = (TransportRemovedEvent) e;
if(ctx.getTransportId().equals(t.getTransportId())) {
@@ -184,6 +194,7 @@ class ReactiveOutgoingSession implements MessagingSession, EventListener {
private class GenerateAck implements Runnable {
public void run() {
if(interrupted) return;
int maxMessages = packetWriter.getMaxMessagesForAck(Long.MAX_VALUE);
try {
Ack a = db.generateAck(contactId, maxMessages);
@@ -217,6 +228,7 @@ class ReactiveOutgoingSession implements MessagingSession, EventListener {
private class GenerateBatch implements Runnable {
public void run() {
if(interrupted) return;
try {
Collection<byte[]> b = db.generateRequestedBatch(contactId,
MAX_PACKET_LENGTH, maxLatency);
@@ -250,6 +262,7 @@ class ReactiveOutgoingSession implements MessagingSession, EventListener {
private class GenerateOffer implements Runnable {
public void run() {
if(interrupted) return;
int maxMessages = packetWriter.getMaxMessagesForOffer(
Long.MAX_VALUE);
try {
@@ -284,6 +297,7 @@ class ReactiveOutgoingSession implements MessagingSession, EventListener {
private class GenerateRequest implements Runnable {
public void run() {
if(interrupted) return;
int maxMessages = packetWriter.getMaxMessagesForRequest(
Long.MAX_VALUE);
try {
@@ -318,6 +332,7 @@ class ReactiveOutgoingSession implements MessagingSession, EventListener {
private class GenerateRetentionAck implements Runnable {
public void run() {
if(interrupted) return;
try {
RetentionAck a = db.generateRetentionAck(contactId);
if(LOG.isLoggable(INFO))
@@ -351,6 +366,7 @@ class ReactiveOutgoingSession implements MessagingSession, EventListener {
private class GenerateRetentionUpdate implements Runnable {
public void run() {
if(interrupted) return;
try {
RetentionUpdate u =
db.generateRetentionUpdate(contactId, maxLatency);
@@ -385,6 +401,7 @@ class ReactiveOutgoingSession implements MessagingSession, EventListener {
private class GenerateSubscriptionAck implements Runnable {
public void run() {
if(interrupted) return;
try {
SubscriptionAck a = db.generateSubscriptionAck(contactId);
if(LOG.isLoggable(INFO))
@@ -418,6 +435,7 @@ class ReactiveOutgoingSession implements MessagingSession, EventListener {
private class GenerateSubscriptionUpdate implements Runnable {
public void run() {
if(interrupted) return;
try {
SubscriptionUpdate u =
db.generateSubscriptionUpdate(contactId, maxLatency);
@@ -452,6 +470,7 @@ class ReactiveOutgoingSession implements MessagingSession, EventListener {
private class GenerateTransportAcks implements Runnable {
public void run() {
if(interrupted) return;
try {
Collection<TransportAck> acks =
db.generateTransportAcks(contactId);
@@ -485,6 +504,7 @@ class ReactiveOutgoingSession implements MessagingSession, EventListener {
private class GenerateTransportUpdates implements Runnable {
public void run() {
if(interrupted) return;
try {
Collection<TransportUpdate> t =
db.generateTransportUpdates(contactId, maxLatency);

View File

@@ -121,6 +121,7 @@ class SinglePassOutgoingSession implements MessagingSession {
private class GenerateAck implements Runnable {
public void run() {
if(interrupted) return;
int maxMessages = packetWriter.getMaxMessagesForAck(Long.MAX_VALUE);
try {
Ack a = db.generateAck(contactId, maxMessages);
@@ -155,6 +156,7 @@ class SinglePassOutgoingSession implements MessagingSession {
private class GenerateBatch implements Runnable {
public void run() {
if(interrupted) return;
try {
Collection<byte[]> b = db.generateBatch(contactId,
MAX_PACKET_LENGTH, maxLatency);
@@ -189,6 +191,7 @@ class SinglePassOutgoingSession implements MessagingSession {
private class GenerateRetentionAck implements Runnable {
public void run() {
if(interrupted) return;
try {
RetentionAck a = db.generateRetentionAck(contactId);
if(LOG.isLoggable(INFO))
@@ -223,6 +226,7 @@ class SinglePassOutgoingSession implements MessagingSession {
private class GenerateRetentionUpdate implements Runnable {
public void run() {
if(interrupted) return;
try {
RetentionUpdate u =
db.generateRetentionUpdate(contactId, maxLatency);
@@ -258,6 +262,7 @@ class SinglePassOutgoingSession implements MessagingSession {
private class GenerateSubscriptionAck implements Runnable {
public void run() {
if(interrupted) return;
try {
SubscriptionAck a = db.generateSubscriptionAck(contactId);
if(LOG.isLoggable(INFO))
@@ -292,6 +297,7 @@ class SinglePassOutgoingSession implements MessagingSession {
private class GenerateSubscriptionUpdate implements Runnable {
public void run() {
if(interrupted) return;
try {
SubscriptionUpdate u =
db.generateSubscriptionUpdate(contactId, maxLatency);
@@ -327,6 +333,7 @@ class SinglePassOutgoingSession implements MessagingSession {
private class GenerateTransportAcks implements Runnable {
public void run() {
if(interrupted) return;
try {
Collection<TransportAck> acks =
db.generateTransportAcks(contactId);
@@ -361,6 +368,7 @@ class SinglePassOutgoingSession implements MessagingSession {
private class GenerateTransportUpdates implements Runnable {
public void run() {
if(interrupted) return;
try {
Collection<TransportUpdate> t =
db.generateTransportUpdates(contactId, maxLatency);