Search code examples
apache-kafkakafka-consumer-apispring-kafkaspring-cloud-stream-binder-kafka

Springboot kafka consumer not receiving messages after another has already received the message


I am new to KAFKA and would like help

I have 2 applications (springboot), they are identical/copies only with different ports. http://localhost:8080/ http://localhost:8081/

They are both consumers

the two listen to the topic XXX

I have another APP that plays the role of producer.

whenever I send something to topic XXX.

only one of them consumes the message and the other does not.

I have tested both individually and they listen normally if they are listening alone, but if they are listening together only one of them will listen.

I'm using

         <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            <version>3.0.0.RELEASE</version>
        </dependency>
spring.cloud.stream.kafka.binder.autoCreateTopics=true
spring.cloud.stream.kafka.binder.headers=type
spring.cloud.stream.kafka.default.consumer.ackEachRecord=true
spring.cloud.stream.kafka.default.consumer.enableDlq=true
spring.cloud.stream.kafka.default.consumer.standardHeaders=both
spring.cloud.stream.kafka.default.consumer.dlqName=api***.d**.api***

my listener


@StreamListener(target=SomeString.TOPIC, condition = "headers['type']=='***' or headers['type']=='***'")
    public void handle(GenericMessage<String> message) throws BusinessException {
   
***
}

Solution

  • The Apache Kafka Binder implementation maps each destination to an Apache Kafka topic. The consumer group maps directly to the same Apache Kafka concept. Partitioning also maps directly to Apache Kafka partitions as well.

    check kafka consumer group behavior -> https://kafka.apache.org/documentation/#consumerconfigs_group.id

    if kafka consumers have same groupId just one of listen message, you should give them different groupId

    app in 8080 -> spring.cloud.stream.bindings.<channelName>.group=consumer-group-1

    app in 8081 -> spring.cloud.stream.bindings.<channelName>.group=consumer-group-2