Search code examples
javaamazon-web-servicesspring-bootspring-cloudamazon-sqs

SQS - all messages goes into flight even when maxNumberOfMessages is set to 1


I have a defined pod scaling based on the number of messages in sqs. And i want each pod to process 1 message.

So if i have 3 messages, i will have 3 pods and each processing 1 message.

This is how i am retreiving the messages from sqs. with withMaxNumberOfMessages(1)

ReceiveMessageRequest receiveMessageRequest = new 
ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(1).withWaitTimeSeconds(10);
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
System.out.println("Number of messages - "+messages.size());

And i can see the number of messages it picks is 1.

Issue that i am facing is when 1 pod run, all the messages in queue goes into flight-mode. And the remaining pods gets zero messages to read.

Why is that happening. even though i specified maxNumberOfMessages to 1 , why all the messages goes into flight-mod. I expect it to just pick 1 message and that message goes into flight-mode and the remaining messages remain in queue and is available for other pods

This is how i ran the code on pod startup

@EventListener(ApplicationReadyEvent.class)
    public void init() throws InterruptedException {
        SQSS3Event message = sqsRepository.getMessage(queueUrl);
        while(message != null){
            System.out.println(message.getBucketName());
            System.out.println(message.getFileName());
            System.out.println(message.getReceiptHandle());
            sqsRepository.changeMessageVisibility(queueUrl, message.getReceiptHandle(), 70);
            Thread.sleep(60000);
            sqsRepository.changeMessageVisibility(queueUrl, message.getReceiptHandle(), 10);
            sqsRepository.deleteMessage(queueUrl,message.getReceiptHandle());

            message = sqsRepository.getMessage(queueUrl);
        }
        System.out.println("No more messages to process");
    }

And this is the helper method for retrieving message from SQS

public SQSS3Event getMessage(String queueUrl){
    ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(1).withWaitTimeSeconds(10);
    List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
    System.out.println("Number of messages - "+messages.size());
    if(messages.size()>0) {
        S3EventNotification notification = S3EventNotification.parseJson(messages.get(0).getBody());
        return SQSS3Event.builder()
                .bucketName(notification.getRecords().get(0).getS3().getBucket().getName())
                .fileName(notification.getRecords().get(0).getS3().getObject().getKey())
                .receiptHandle(messages.get(0).getReceiptHandle())
                .build();
    }
    else {
        return null;
    }
}

Added print statements to read queue attributes before reading and after reading a message

ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest().withQueueUrl(queueUrl).withMaxNumberOfMessages(1).withWaitTimeSeconds(5);
GetQueueAttributesResult att = sqs.getQueueAttributes(queueUrl, List.of("ApproximateNumberOfMessages", "ApproximateNumberOfMessagesNotVisible"));
System.out.println("##########################Before reading##########################");
System.out.println("No of Messages - "+ att.getAttributes().get("ApproximateNumberOfMessages"));
System.out.println("No of Messages on Flight - "+ att.getAttributes().get("ApproximateNumberOfMessagesNotVisible"));
System.out.println("##################################################################");
ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
List<Message> messages = result.getMessages();

System.out.println("No of messages in the result - "+messages.size());

S3EventNotification notification = S3EventNotification.parseJson(messages.get(0).getBody());
att = sqs.getQueueAttributes(queueUrl, List.of("ApproximateNumberOfMessages", "ApproximateNumberOfMessagesNotVisible"));
System.out.println("##########################After reading##########################");
System.out.println("No of Messages - "+ att.getAttributes().get("ApproximateNumberOfMessages"));
System.out.println("No of Messages on Flight - "+ att.getAttributes().get("ApproximateNumberOfMessagesNotVisible"));
System.out.println("##################################################################");

Output

##########################Before reading##########################
No of Messages - 4
No of Messages on Flight - 0
##################################################################
No of messages in the result- 1
##########################After reading##########################
No of Messages - 0
No of Messages on Flight - 4
##################################################################

With the trace

 ##########################Before reading##########################
No of Messages - 2
No of Messages on Flight - 0
##################################################################
2022-04-07 21:59:24.036 TRACE 21096 --- [           main] c.a.s.sqs.buffered.ReceiveQueueBuffer    : Spawned receive batch #1 (1 of 10 inflight) for queue http://localhost:4576/queue/upload-notifications
2022-04-07 21:59:24.166 DEBUG 21096 --- [rWorkerThread-1] c.a.s.sqs.MessageMD5ChecksumHandler      : Message body: {"Records": [{"eventVersion": "2.0", "eventName": "ObjectCreated:Put", "eventTime": "2022-04-07T19:14:39.050963Z", "userIdentity": {"principalId": "AIDAJDPLRKLG7UEXAMPLE"}, "eventSource": "aws:s3", "requestParameters": {"sourceIPAddress": "127.0.0.1"}, "s3": {"configurationId": "testConfigRule", "object": {"versionId": "096fKKXTRTtl3on89fVO.nfljtsv6qko", "eTag": "d41d8cd98f00b204e9800998ecf8427e", "sequencer": "0055AED6DCD90281E5", "key": "3.txt", "size": 1024}, "bucket": {"arn": "arn:aws:s3:::uploads-toprocess", "name": "uploads-toprocess", "ownerIdentity": {"principalId": "A3NL1KOZZKExample"}}, "s3SchemaVersion": "1.0"}, "responseElements": {"x-amz-id-2": "eftixk72aD6Ap51TnqcoF8eFidJG9Z/2", "x-amz-request-id": "2e30a6e9"}, "awsRegion": "us-east-1"}]}
2022-04-07 21:59:24.167 DEBUG 21096 --- [rWorkerThread-1] c.a.s.sqs.MessageMD5ChecksumHandler      : Expected  MD5 of message body: 9428bd5190b9d47af3368b3f67c62d02
2022-04-07 21:59:24.167 DEBUG 21096 --- [rWorkerThread-1] c.a.s.sqs.MessageMD5ChecksumHandler      : Message body: {"Records": [{"eventVersion": "2.0", "eventName": "ObjectCreated:Put", "eventTime": "2022-04-07T19:15:50.375772Z", "userIdentity": {"principalId": "AIDAJDPLRKLG7UEXAMPLE"}, "eventSource": "aws:s3", "requestParameters": {"sourceIPAddress": "127.0.0.1"}, "s3": {"configurationId": "testConfigRule", "object": {"versionId": "096fKKXTRTtl3on89fVO.nfljtsv6qko", "eTag": "d41d8cd98f00b204e9800998ecf8427e", "sequencer": "0055AED6DCD90281E5", "key": "3.txt", "size": 1024}, "bucket": {"arn": "arn:aws:s3:::uploads-toprocess", "name": "uploads-toprocess", "ownerIdentity": {"principalId": "A3NL1KOZZKExample"}}, "s3SchemaVersion": "1.0"}, "responseElements": {"x-amz-id-2": "eftixk72aD6Ap51TnqcoF8eFidJG9Z/2", "x-amz-request-id": "a8ba5ab3"}, "awsRegion": "us-east-1"}]}
2022-04-07 21:59:24.167 DEBUG 21096 --- [rWorkerThread-1] c.a.s.sqs.MessageMD5ChecksumHandler      : Expected  MD5 of message body: 18ce2856addac8a08c394ce8fbd7d315
2022-04-07 21:59:24.167 TRACE 21096 --- [rWorkerThread-1] c.a.s.sqs.buffered.ReceiveQueueBuffer    : Queue http://localhost:4576/queue/upload-notifications now has 1 receive results cached 
2022-04-07 21:59:24.168 TRACE 21096 --- [rWorkerThread-1] c.a.s.sqs.buffered.ReceiveQueueBuffer    : Spawned receive batch #2 (1 of 10 inflight) for queue http://localhost:4576/queue/upload-notifications
No of messages in the result- 1
##########################After reading##########################
No of Messages - 0
No of Messages on Flight - 2
##################################################################

Solution - (Got a workaround. Eventhough i don't fully understand it)

I was using spring cloud aws dependencies and was using an @Autowired AmazonSQS instance to interact with the queue. That one by default, gets 10 messages into some kind of BufferQueue and then serves the messages one by one. That was the reason why all messages goes into flight and then processed one by one.

Was able to override that one by defining a simple bean.

@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
    SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
    simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
    simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
    simpleMessageListenerContainer.setMaxNumberOfMessages(1);
    simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
    return simpleMessageListenerContainer;
}

And now it works in sync with withMaxNumberOfMessages(n) as expected


Solution

  • Solution - (Got a workaround. Eventhough i don't fully understand it)

    I was using spring cloud aws dependencies and was using an @Autowired AmazonSQS instance to interact with the queue. That one by default, gets 10 messages into some kind of BufferQueue and then serves the messages one by one. That was the reason why all messages goes into flight and then processed one by one.

    Was able to override that one by defining a simple bean.

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
        simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
        simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
        simpleMessageListenerContainer.setMaxNumberOfMessages(1);
        simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
        return simpleMessageListenerContainer;
    }
    

    And now it works in sync with withMaxNumberOfMessages(n) as expected