Search code examples
javaapache-camelamazon-sqs

Does Apache Camel skip deleting a message read from a queue if processing takes too long?


I have a single Apache consumer that reads messages from an SQS standard queue and processes it.
Some of the messages take very little time to be processed (1-2 min), whereas some messages take a long time (70-80 min).

I have not explicitly specified a visibility timeout in the Camel configurations, so each message read is given a visibility timeout of 30s, as specified in the queue configuration.

Messages that take a short time to be processed are working fine, but for messages that long durations, Apache Camel is not deleting the messages once it has finished processing. So the messages become available in the queue again, and Camel reads the message again, processes it for a long time and this cycle repeats.

I understand that the messages are reappearing in the queue because the visibility timeout is low, but my concern is -

1. Why doesn't Camel delete the message as soon as processing is finished for these long running jobs?
2. Messages for short jobs are being deleted correctly
(even the short running jobs exceed the visibility timeout of 30 sec)

There are no errors or exceptions occurring anywhere.
I even made a sample POC job that does nothing else but wait for a long time (85 minutes) and then prints a success message. What I notice is that the job completes successfully, but Camel doesn't delete the message. Why?


Solution

  • I fixed this using configuration options in Camel - 'extendMessageVisibility'. When we set this option to true, Camel runs a background task that keeps extending the visibility timeout of the message until it gets processed by the consumer.

    Again, my real problem wasn't that the message became visible on the queue again, but that it wasn't being deleted by the Camel consumer.

    However, based on my observation, I see that applying the extendMessageVisibility option has led to Camel keeping track of the message and then deleting it correctly.

    We can configure the queue URI like this:

    String queueUri = "aws-sqs://" + queueName + "?amazonSQSClient=#client&extendMessageVisibility=true&visibilityTimeout=60";