I'm trying to implement logic with manual deleting of AWS SQS message using spring-cloud-aws-messaging. This feature was implemented in scope of this ticket from the example in tests
@SqsListener(value = "queueName", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void listen(SqsEventDTO message, Acknowledgment acknowledgment) {
LOGGER.info("Received message {}", message.getFoo());
try {
acknowledgment.acknowledge().get();
} catch (InterruptedException e) {
LOGGER.error("Opps", e);
} catch (ExecutionException e) {
LOGGER.error("Opps", e);
}
}
But faced with the unexpected exception
com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of
org.springframework.cloud.aws.messaging.listener.Acknowledgment(no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
Solution with SqsMessageDeletionPolicy.ON_SUCCESS
works but I want to avoid throwing an exception.
What have I missed in the configuration?
It took some fiddling around and trying different things from other SO answers.
Here is my code and I'll try to explain as best I can. I'm including everything that I'm using for my SQS consumer.
My config class is below. Only not-so-obvious thing to note below is the converter and resolver objects instantiated in the queueMessageHandlerFactory method. The MappingJackson2MessageConverter (in case it isn't obvious from the oh-so-obvious class name) class handles the deserialization of the payload from SQS.
It's also important that the strict content type match be set to false.
Also, the MappingJackson2MessageConverter allows you to set your own Jackson ObjectMapper, however if you do that you will need to configure it as follows:
objectMapper.configure(MapperFeature.DEFAULT_VIEW_INCLUSION, false);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
You may not want to do that, so you can leave it null and it will create its own ObjectMapper.
I think the rest of the code is pretty self-explanatory...? Let me know if not.
One difference between our use-cases, it looks like you're mapping your own custom object (SqsEventDTO) and I assume that's working? In that case, I don't think you will need the MappingJackson2MessageConverter, but I could be wrong.
@Configuration
public class AppConfig {
@Bean
@Primary
public QueueMessageHandler queueMessageHandler(@Autowired QueueMessageHandlerFactory queueMessageHandlerFactory) {
return queueMessageHandlerFactory.createQueueMessageHandler();
}
@Bean
@Primary
public QueueMessageHandlerFactory queueMessageHandlerFactory(@Autowired AmazonSQSAsync sqsClient) {
QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
factory.setAmazonSqs(sqsClient);
MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
messageConverter.setSerializedPayloadClass(String.class);
//set strict content type match to false
messageConverter.setStrictContentTypeMatch(false);
// Uses the MappingJackson2MessageConverter object to resolve/map
// the payload against the Message/S3EventNotification argument.
PayloadArgumentResolver payloadResolver = new PayloadArgumentResolver(messageConverter);
// Extract the acknowledgment data from the payload's headers,
// which then gets deserialized into the Acknowledgment object.
AcknowledgmentHandlerMethodArgumentResolver acknowledgmentResolver = new AcknowledgmentHandlerMethodArgumentResolver("Acknowledgment");
// I don't remember the specifics of WHY, however there is
// something important about the order of the argument resolvers
// in the list
factory.setArgumentResolvers(Arrays.asList(acknowledgmentResolver, payloadResolver));
return factory;
}
@Bean("ConsumerBean")
@Primary
public SimpleMessageListenerContainer simpleMessageListenerContainer(@Autowired AmazonSQSAsync amazonSQSAsync, @Autowired QueueMessageHandler queueMessageHandler,
@Autowired ThreadPoolTaskExecutor threadPoolExecutor) {
SimpleMessageListenerContainer smlc = new SimpleMessageListenerContainer();
smlc.setWaitTimeOut(20);
smlc.setAmazonSqs(amazonSQSAsync);
smlc.setMessageHandler(queueMessageHandler);
smlc.setBeanName("ConsumerBean");
smlc.setMaxNumberOfMessages(sqsMaxMessages);
smlc.setTaskExecutor(threadPoolExecutor);
return smlc;
}
@Bean
@Primary
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setAllowCoreThreadTimeOut(coreThreadsTimeout);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setMaxPoolSize(maxPoolSize);
executor.setKeepAliveSeconds(threadTimeoutSeconds);
executor.setThreadNamePrefix(threadName);
executor.initialize();
return executor;
}
}
My SQS consumer Service class is below.
@Service
public class RawConsumer {
@SqsListener(deletionPolicy = SqsMessageDeletionPolicy.NEVER, value = "${input.sqs.queuename}")
public void sqsListener(S3EventNotification event, Acknowledgment ack) throws Exception {
// Handle event here
}
I hope that helps, let me know if you have any issues.