I am using Spring Cloud to consume Simple Queue Service (SQS). I have the following configurations for parallel processing:
@Bean
public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor() {
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
simpleAsyncTaskExecutor.setConcurrencyLimit(50);
return simpleAsyncTaskExecutor;
}
@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor) {
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAutoStartup(true);
factory.setTaskExecutor(simpleAsyncTaskExecutor);
factory.setWaitTimeOut(20);
factory.setMaxNumberOfMessages(10);
return factory;
}
I need to process 50 messages in 50 threads (configuration in the bean SimpleAsyncTaskExecutor), but is processing only 10 messages in parallel (maxNumberOfMessages returned from SQS)
How can I process 50 messages instead 10?
I found the solution.
It's necessary to annotate the method with @Async
, change deletionPolicy
to NEVER
, and delete the message when finalizing execution.
In this way, the queue consume will respect the configured number of threads. For example, if you have 50 threads, will make 5 requests in the SQS queue (10 messages per request), thus processing a total of 50 messages in parallel.
The code looks like this:
@Async
@SqsListener(value = "sqsName", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void consume(String message, Acknowledgment acknowledgment) throws InterruptedException, ExecutionException {
//your code
acknowledgment.acknowledge().get(); //To delete message from queue
}