Search code examples
spring-cloud-streamspring-cloud-stream-binder-kafka

Messages getting omitted in Spring Cloud Stream Kafka in Reactive


I have a spring cloud application, using spring reactive core listening to two topics and each has 10 partitions.

In the consumer I am simply reading the message and printing the topic, partition and offset, Some messages are not getting read.

I have tried both auto commit and manual acknowledgement.

Test setup: Pushed 30K messages into each topic Topic 1 and Topic 2 and started the application, it read only 59999 records instead of 60000.

The lags in all the partitions of all the topics is 0 suggesting all data are consumed.

  1. Receiver Code:
 @Bean
    public Consumer<Flux<Message<String>>> receiver() {
        return (sink -> {
            sink
                      .doOnNext((record)->{
                        String topic=(String) record.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC);
                        Long offset=(Long) record.getHeaders().get(KafkaHeaders.OFFSET);
                        Integer partition=(Integer) record.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID);
                        log.startstate(()-> format("Register Topic %s partition %d offset %d",topic,partition,offset));
                    })
                    .subscribe();


        });

My Application.yml contains the below information

spring:
  cloud:
    stream:
      kafka:
        default:
          consumer:
            autoCommitOffset: true
        bindings:
          receiver-in-0:
            consumer:
              autoCommitOffset: true
        binder:
          brokers: localhost:9092
          autoAddPartitions: true
          minPartitionCount: 10
          auto-offset-reset: earliest
      bindings:
        receiver-in-0:
          binder: kafka
          destination: Topic1,Topic2
          content-type: text/plain;charset=UTF-8
          group: input-group-1
          max-attempts: 5
          back-off-initial-interval: 10000
          back-off-max-interval: 30000
        emitter-out-0:
          binder: kafka
          producer:
            partition-count:  2
            partition-key-extractor-name: EmitterPartitionKey
        erroremitter-out-0:
          binder: kafka
          destination: error
        error:
          binder: kafka
          destination: error

spring.cloud.stream.function.definition: receiver;emitter;erroremitter

My log file shows that the consumer has not read Topic:Topic1 partition:2 offset 76031 , it jumped from 76030 to 76032,

[STARTSTATE] 2020-05-22 11:40:01.033 [KafkaConsumerDestination{consumerDestinationName='Topic1', partitions=10, dlqName='null'}.container-0-C-1][52] CloudConsumer - Register Topic Topic1 partition 2 offset 76030
[STARTSTATE] 2020-05-22 11:40:01.034 [KafkaConsumerDestination{consumerDestinationName='Topic1', partitions=10, dlqName='null'}.container-0-C-1][52] CloudConsumer - Register Topic Topic2 partition 7 offset 86149
[STARTSTATE] 2020-05-22 11:40:01.034 [KafkaConsumerDestination{consumerDestinationName='Topic1', partitions=10, dlqName='null'}.container-0-C-1][52] CloudConsumer - Register Topic Topic2 partition 7 offset 86150
[STARTSTATE] 2020-05-22 11:40:01.034 [KafkaConsumerDestination{consumerDestinationName='Topic1', partitions=10, dlqName='null'}.container-0-C-1][52] CloudConsumer - Register Topic Topic1 partition 2 offset 76032

Relevant sections of the pom.xml:

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>11</java.version>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <spring-boot.version>2.1.7.RELEASE</spring-boot.version>
        <spring-cloud.version>Hoxton.SR4</spring-cloud.version>
        <skipTests>true</skipTests>
    </properties>
    <dependencies>   
       <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            <version>3.0.4.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-reactive</artifactId>
            <version>2.2.1.RELEASE</version>
        </dependency>
   </dependencies>



Solution

  • I reproduced your issue and will open an issue against SCSt (not sure if it's a stream problem or Reactor).

    I don't see any missing records when Reactor is not involved.

    Consumer<Message<String>>
    

    https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/906