Search code examples
javaspring-bootmavenapache-kafkaspring-cloud-stream

Consume Batches through Kafka in Spring boot


I'm new to Kafka, Wanna enable processing in batches through the consumer.

Read through documentation and found that Starting with version 3.0 we can enable batch processing.

Currently we are using Spring Boot 2.1.3.RELEASE and below dependencies for kafka:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Greenwich.SR3</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

What changes do I need to do in pom.xml before starting properties and code changes? Do i need to change Springboot version?


Solution

  • You can consume it as a batch with @StreamListener. You just need to give a deserializer. Example :

    You just need to give a deserializer.

    public class Person {
    
        private String name;
        private String surname;
        .........
    }
    
    
       @StreamListener(value = PersonStream.INPUT)
        private void personBulkReceiver(List<Person> person) {
            System.out.println("personBulkReceiver : " + person.size());
        }
    
    
    spring:
      cloud:
        stream:
          kafka:
          binders:
            bulkKafka:
              type: kafka
              environment:
                spring:
                  cloud:
                    stream:
                      kafka:
                        binder:
                          brokers: localhost:9092
                          configuration:
                            max.poll.records: 1500
                            fetch.min.bytes: 1000000
                            fetch.max.wait.ms: 10000
                            value.deserializer: tr.cloud.stream.examples.PersonDeserializer
          bindings:
            person-topic-in:
              binder: bulkKafka
              destination: person-topic
              contentType: application/person
              group : omercelik
              consumer:
                batch-mode: true
    
    public class PersonDeserializer extends JsonDeserializer<Person> {
    }