I am checking this guide on using Quarkus to consume from SQS.
The thing is I want to do it on an endless loop, fetching new messages every 10 seconds for example, and inserting some data from the message in a database using Hibernate Reactive.
I created a Quarkus Scheduler, but since it does not support returning a Uni, I had to block the response from Hibernate reactive, and got this error
2022-02-16 15:01:24,058 ERROR [de.sup.tea.con.SqsConsumer] (vert.x-eventloop-thread-9) Finished with error!: io.smallrye.mutiny.CompositeException: Multiple exceptions caught:
[Exception 0] io.vertx.core.impl.NoStackTraceThrowable: Timeout
[Exception 1] java.lang.IllegalStateException: HR000061: Session is currently connecting to database
How is the best way to achieve what I need using Quarkus and reactive?
Since Quarkus Scheduler is not in the I/O thread, it's not possible to use hibernate reactive. So, to make it work, you can work together with the EventBus. Below is a fully functional example. Code inside processReceivedMessageResponse method runs in the I/O thread and can depend on Hibernate Reactive.
import io.quarkus.scheduler.Scheduled
import io.quarkus.vertx.ConsumeEvent
import io.smallrye.mutiny.Uni
import io.vertx.mutiny.core.eventbus.EventBus
import org.eclipse.microprofile.config.inject.ConfigProperty
import org.jboss.logging.Logger
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.Message
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse
import java.util.concurrent.CompletionStage
import javax.enterprise.context.ApplicationScoped
import javax.enterprise.inject.Instance
@ApplicationScoped
class SqsConsumer(
private val eventBus: EventBus,
private val logger: Logger,
@ConfigProperty(name = "sqs.consumer.maxFetchedMessages")
private val maxFetchedEvents: Int,
private val handlers: Instance<MessageHandler>,
private val sqsClient: SqsAsyncClient,
) {
@Scheduled(every = "{sqs.consumer.interval}")
fun execute() {
handlers.stream().forEach { handler ->
val handlerName = handler.javaClass.name
logger.info("Fetching messages for $handlerName...")
Uni
.createFrom()
.completionStage(fetchMessages(handler.queueUrl()))
.subscribe()
.with(
{ response ->
val newEventsCount = response.messages().size
if (newEventsCount > 0) {
logger.info("$newEventsCount message(s) fetched for $handlerName.")
eventBus.send("receive-message-responses", ResponseHolder(handler, response))
} else {
logger.info("Queue was empty. Maybe next time.")
}
},
{ logger.error("Error fetching messages!", it) }
)
}
}
@ConsumeEvent("receive-message-responses")
fun processReceivedMessageResponse(holder: ResponseHolder): Uni<Void> {
val handler = holder.handler
val handlerName = handler.javaClass.name
val messageResponse = holder.receiveMessageResponse
logger.info("Processing messages for $handlerName...")
return Uni
.createFrom()
.item(holder)
.flatMap { handler.process(messageResponse.messages().map { message -> message.body() }) }
.onItem()
.invoke { _ ->
logger.info("Processing succeeded. Deleting processed events from the queue...")
messageResponse
.messages()
.forEach { eventBus.send("processed-messages", MessageHolder(handler, it)) }
}
.replaceWithVoid()
.onFailure()
.invoke { it -> logger.error("Error processing messages!", it) }
}
@ConsumeEvent("processed-messages")
fun deleteProcessedMessages(holder: MessageHolder): Uni<Void> {
val handler = holder.handler
val message = holder.message
return Uni
.createFrom()
.completionStage(
sqsClient.deleteMessage {
it
.queueUrl(handler.queueUrl())
.receiptHandle(message.receiptHandle())
}
)
.onItem()
.invoke { _ -> logger.info("Message ${message.messageId()} deleted from the queue!") }
.onFailure()
.invoke { it -> logger.error("Could not delete message ${message.messageId()} from the queue!", it) }
.replaceWithVoid()
}
private fun fetchMessages(queueUrl: String): CompletionStage<ReceiveMessageResponse> {
return sqsClient
.receiveMessage {
it
.maxNumberOfMessages(maxFetchedEvents)
.queueUrl(queueUrl)
}
}
}
class ResponseHolder(
val handler: MessageHandler,
val receiveMessageResponse: ReceiveMessageResponse,
)
class MessageHolder(
val handler: MessageHandler,
val message: Message,
)