Search code examples
amazon-sqsquarkus

Quarkus SQS consumer


I am checking this guide on using Quarkus to consume from SQS.

The thing is I want to do it on an endless loop, fetching new messages every 10 seconds for example, and inserting some data from the message in a database using Hibernate Reactive.

I created a Quarkus Scheduler, but since it does not support returning a Uni, I had to block the response from Hibernate reactive, and got this error

2022-02-16 15:01:24,058 ERROR [de.sup.tea.con.SqsConsumer] (vert.x-eventloop-thread-9) Finished with error!: io.smallrye.mutiny.CompositeException: Multiple exceptions caught:
    [Exception 0] io.vertx.core.impl.NoStackTraceThrowable: Timeout
    [Exception 1] java.lang.IllegalStateException: HR000061: Session is currently connecting to database

How is the best way to achieve what I need using Quarkus and reactive?


Solution

  • Since Quarkus Scheduler is not in the I/O thread, it's not possible to use hibernate reactive. So, to make it work, you can work together with the EventBus. Below is a fully functional example. Code inside processReceivedMessageResponse method runs in the I/O thread and can depend on Hibernate Reactive.

    import io.quarkus.scheduler.Scheduled
    import io.quarkus.vertx.ConsumeEvent
    import io.smallrye.mutiny.Uni
    import io.vertx.mutiny.core.eventbus.EventBus
    import org.eclipse.microprofile.config.inject.ConfigProperty
    import org.jboss.logging.Logger
    import software.amazon.awssdk.services.sqs.SqsAsyncClient
    import software.amazon.awssdk.services.sqs.model.Message
    import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse
    import java.util.concurrent.CompletionStage
    import javax.enterprise.context.ApplicationScoped
    import javax.enterprise.inject.Instance
    
    @ApplicationScoped
    class SqsConsumer(
        private val eventBus: EventBus,
        private val logger: Logger,
        @ConfigProperty(name = "sqs.consumer.maxFetchedMessages")
        private val maxFetchedEvents: Int,
        private val handlers: Instance<MessageHandler>,
        private val sqsClient: SqsAsyncClient,
    ) {
    
        @Scheduled(every = "{sqs.consumer.interval}")
        fun execute() {
            handlers.stream().forEach { handler ->
                val handlerName = handler.javaClass.name
                logger.info("Fetching messages for $handlerName...")
                Uni
                    .createFrom()
                    .completionStage(fetchMessages(handler.queueUrl()))
                    .subscribe()
                    .with(
                        { response ->
                            val newEventsCount = response.messages().size
                            if (newEventsCount > 0) {
                                logger.info("$newEventsCount message(s) fetched for $handlerName.")
                                eventBus.send("receive-message-responses", ResponseHolder(handler, response))
                            } else {
                                logger.info("Queue was empty. Maybe next time.")
                            }
                        },
                        { logger.error("Error fetching messages!", it) }
                    )
            }
        }
    
        @ConsumeEvent("receive-message-responses")
        fun processReceivedMessageResponse(holder: ResponseHolder): Uni<Void> {
            val handler = holder.handler
            val handlerName = handler.javaClass.name
            val messageResponse = holder.receiveMessageResponse
            logger.info("Processing messages for $handlerName...")
            return Uni
                .createFrom()
                .item(holder)
                .flatMap { handler.process(messageResponse.messages().map { message -> message.body() }) }
                .onItem()
                .invoke { _ ->
                    logger.info("Processing succeeded. Deleting processed events from the queue...")
                    messageResponse
                        .messages()
                        .forEach { eventBus.send("processed-messages", MessageHolder(handler, it)) }
                }
                .replaceWithVoid()
                .onFailure()
                .invoke { it -> logger.error("Error processing messages!", it) }
        }
    
        @ConsumeEvent("processed-messages")
        fun deleteProcessedMessages(holder: MessageHolder): Uni<Void> {
            val handler = holder.handler
            val message = holder.message
            return Uni
                .createFrom()
                .completionStage(
                    sqsClient.deleteMessage {
                        it
                            .queueUrl(handler.queueUrl())
                            .receiptHandle(message.receiptHandle())
                    }
                )
                .onItem()
                .invoke { _ -> logger.info("Message ${message.messageId()} deleted from the queue!") }
                .onFailure()
                .invoke { it -> logger.error("Could not delete message ${message.messageId()} from the queue!", it) }
                .replaceWithVoid()
        }
    
        private fun fetchMessages(queueUrl: String): CompletionStage<ReceiveMessageResponse> {
            return sqsClient
                .receiveMessage {
                    it
                        .maxNumberOfMessages(maxFetchedEvents)
                        .queueUrl(queueUrl)
                }
        }
    }
    
    class ResponseHolder(
        val handler: MessageHandler,
        val receiveMessageResponse: ReceiveMessageResponse,
    )
    
    class MessageHolder(
        val handler: MessageHandler,
        val message: Message,
    )