Search code examples
transactionsspring-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

Spring cloud stream kafka transaction configuration


I am following this template for Spring-cloud-stream-kafka but got stuck while making the producer method transactional. I have not used kafka earlier so need help with this in case any configuration changes needed in kafka

It works well if no transactional configuration added but when transactional configurations are added it gets timed out at startup -

2020-11-21 15:07:55.349 ERROR 20432 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : Failed to obtain partition information
org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId

below is my setup for Spring-cloud-stream

pom.xml

<properties>
    <java.version>11</java.version>
    <spring-boot.version>2.3.3.RELEASE</spring-boot.version>
    <spring-cloud.version>Hoxton.SR8</spring-cloud.version>
    <kafka-avro-serializer.version>5.2.1</kafka-avro-serializer.version>
    <avro.version>1.8.2</avro.version>
</properties>

<dependencyManagement>
    <dependencies>
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-dependencies</artifactId>
          <version>${spring-boot.version}</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

transactionManager

@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders) {
    ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder("kafka",
            MessageChannel.class)).getTransactionalProducerFactory();
    return new KafkaTransactionManager<>(pf);
}

application.yml

spring:
  cloud:
    stream:
      default: 
         producer: 
          useNativeEncoding: true
         consumer:  
          useNativeEncoding: true     
      bindings:
        input:
          destination: employee-details
          content-type: application/*+avro
          group: group-1
          concurrency: 3
        output:
          destination: employee-details
          content-type: application/*+avro
      kafka:
          binder:        
           producer-properties:
             key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
             value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
             schema.registry.url: http://localhost:8081
             acks: all
             max.block.ms: 60000             
           consumer-properties:
             key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
             value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
             schema.registry.url: http://localhost:8081
             specific.avro.reader: true
           transaction:
            transactionIdPrefix: tx-
           producer:
             enable:
               idempotence: true
#             requiredAcks: all
           brokers:
           - localhost:9094

I am running kafka in minikube, below is config for my topic

[2020-11-21 06:18:21,655] INFO [ZooKeeperClient] Connected. (kafka.zookeeper.ZooKeeperClient)
Topic: employee-details PartitionCount: 1       ReplicationFactor: 1    Configs:
        Topic: employee-details Partition: 0    Leader: 0       Replicas: 0     Isr: 0

logs from kafka controller

TRACE [Controller id=0] Checking need to trigger auto leader balancing (kafka.controller.KafkaController)
[2020-11-24 06:56:21,379] DEBUG [Controller id=0] Topics not in preferred replica for broker 0 Map() (kafka.controller.KafkaController)
[2020-11-24 06:56:21,379] TRACE [Controller id=0] Leader imbalance ratio for broker 0 is 0.0 (kafka.controller.KafkaController)

Solution

  • Look at the server log.

    Transactional producers will time out if there are fewer replicas of the transaction state log than required. By default 3 replicas are required and a minimum of 2 need to be in sync.

    See transaction.state.log.replication.factor and transaction.state.log.min.isr.