Search code examples
apache-kafkaquarkussmallrye-reactive-messaging

Quarkus Kafka - Batch/Bulk message consumer


I want to batch process. In my use case send kafka producer messages are sent one by one. I want to read them as a list in the consumer application. I can do that at the Spring Kafka library. Spring Kafka batch listener

Is there any way to do this with the quarkus-smallrye-reactive-messaging-kafka library?

I tried the example below but got an error.

ERROR [io.sma.rea.mes.provider] (vert.x-eventloop-thread-3) SRMSG00200: The method org.MyConsumer#aggregate has thrown an exception: java.lang.ClassCastException: class org.TestConsumer cannot be cast to class io.smallrye.mutiny.Multi (org.TestConsumer is in unnamed module of loader io.quarkus.bootstrap.classloading.QuarkusClassLoader @6f2c0754; io.smallrye.mutiny.Multi is in unnamed module of loader io.quarkus.bootstrap.classloading.QuarkusClassLoader @4c1638b)

application.properties:

kafka.bootstrap.servers=hosts
mp.messaging.connector.smallrye-kafka.group.id=KafkaQuick
mp.messaging.connector.smallrye-kafka.auto.offset.reset=earliest
mp.messaging.incoming.test-consumer.connector=smallrye-kafka
mp.messaging.incoming.test-consumer.value.deserializer=org.TestConsumerDeserializer

TestConsumerDeserializer:

public class TestConsumerDeserializer extends JsonbDeserializer<TestConsumer>{
    public TestConsumerDeserializer(){
         // pass the class to the parent.
         super(TestConsumer.class);
    }
}  

MyConsumer:

@ApplicationScoped
public class MyConsumer {
    
    @Incoming("test-consumer")
    //@Outgoing("aggregated-channel")
    public void aggregate(Multi<Message<TestConsumer>> in) {
        System.out.println(in);
    }
}

Solution

  • Batch support has been added to the Quarkus Kafka connector. See https://quarkus.io/guides/kafka#receiving-kafka-records-in-batches.