Search code examples
amazon-sqsspring-cloud-aws

Spring Cloud AWS Issue with setting manual acknowledge of SQS message


I'm trying to implement logic with manual deleting of AWS SQS message using spring-cloud-aws-messaging. This feature was implemented in scope of this ticket from the example in tests

@SqsListener(value = "queueName", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void listen(SqsEventDTO message, Acknowledgment acknowledgment) {

    LOGGER.info("Received message {}", message.getFoo());
    try {
        acknowledgment.acknowledge().get();
    } catch (InterruptedException e) {
        LOGGER.error("Opps", e);
    } catch (ExecutionException e) {
        LOGGER.error("Opps", e);
    }
}

But faced with the unexpected exception

com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance oforg.springframework.cloud.aws.messaging.listener.Acknowledgment(no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information

Solution with SqsMessageDeletionPolicy.ON_SUCCESS works but I want to avoid throwing an exception.

What have I missed in the configuration?


Solution

  • It took some fiddling around and trying different things from other SO answers.

    Here is my code and I'll try to explain as best I can. I'm including everything that I'm using for my SQS consumer.

    My config class is below. Only not-so-obvious thing to note below is the converter and resolver objects instantiated in the queueMessageHandlerFactory method. The MappingJackson2MessageConverter (in case it isn't obvious from the oh-so-obvious class name) class handles the deserialization of the payload from SQS.

    It's also important that the strict content type match be set to false.

    Also, the MappingJackson2MessageConverter allows you to set your own Jackson ObjectMapper, however if you do that you will need to configure it as follows:

    objectMapper.configure(MapperFeature.DEFAULT_VIEW_INCLUSION, false);
    objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    

    You may not want to do that, so you can leave it null and it will create its own ObjectMapper.

    I think the rest of the code is pretty self-explanatory...? Let me know if not.

    One difference between our use-cases, it looks like you're mapping your own custom object (SqsEventDTO) and I assume that's working? In that case, I don't think you will need the MappingJackson2MessageConverter, but I could be wrong.

    @Configuration
    public class AppConfig {
    
        @Bean
        @Primary
        public QueueMessageHandler queueMessageHandler(@Autowired QueueMessageHandlerFactory queueMessageHandlerFactory) {
            return queueMessageHandlerFactory.createQueueMessageHandler();
        }
        
        @Bean
        @Primary
        public QueueMessageHandlerFactory queueMessageHandlerFactory(@Autowired AmazonSQSAsync sqsClient) {
    
            QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
            factory.setAmazonSqs(sqsClient);
            
            MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
            messageConverter.setSerializedPayloadClass(String.class);
            
            //set strict content type match to false
            messageConverter.setStrictContentTypeMatch(false);
            
            // Uses the MappingJackson2MessageConverter object to resolve/map 
            // the payload against the Message/S3EventNotification argument.
            PayloadArgumentResolver payloadResolver = new PayloadArgumentResolver(messageConverter);
    
            // Extract the acknowledgment data from the payload's headers, 
            // which then gets deserialized into the Acknowledgment object.  
            AcknowledgmentHandlerMethodArgumentResolver acknowledgmentResolver = new AcknowledgmentHandlerMethodArgumentResolver("Acknowledgment");
            
            // I don't remember the specifics of WHY, however there is 
            // something important about the order of the argument resolvers 
            // in the list
            factory.setArgumentResolvers(Arrays.asList(acknowledgmentResolver, payloadResolver));
        
            return factory;
        }
        
        @Bean("ConsumerBean")
        @Primary
        public SimpleMessageListenerContainer simpleMessageListenerContainer(@Autowired AmazonSQSAsync amazonSQSAsync, @Autowired QueueMessageHandler queueMessageHandler,
            @Autowired ThreadPoolTaskExecutor threadPoolExecutor) {
            
            SimpleMessageListenerContainer smlc = new SimpleMessageListenerContainer();
            smlc.setWaitTimeOut(20);
            smlc.setAmazonSqs(amazonSQSAsync);
            smlc.setMessageHandler(queueMessageHandler);
            smlc.setBeanName("ConsumerBean");
            smlc.setMaxNumberOfMessages(sqsMaxMessages);
            smlc.setTaskExecutor(threadPoolExecutor);
            
            return smlc;
        }
        
        @Bean
        @Primary
        public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            
            executor.setCorePoolSize(corePoolSize);
            executor.setAllowCoreThreadTimeOut(coreThreadsTimeout);
            executor.setWaitForTasksToCompleteOnShutdown(true);
            executor.setMaxPoolSize(maxPoolSize);
            executor.setKeepAliveSeconds(threadTimeoutSeconds);
            executor.setThreadNamePrefix(threadName);
            executor.initialize();
            
            return executor;
        }
    }
    

    My SQS consumer Service class is below.

        @Service
        public class RawConsumer {
    
        @SqsListener(deletionPolicy = SqsMessageDeletionPolicy.NEVER, value = "${input.sqs.queuename}")
        public void sqsListener(S3EventNotification event, Acknowledgment ack) throws Exception {
            // Handle event here
        }
    

    I hope that helps, let me know if you have any issues.