Search code examples
kafka-consumer-apikafka-producer-apispring-cloud-stream

Spring Cloud Stream for Kafka with consumer/producer API exactly once semantics with transaction-id-prefix is not working as expected


I have scenario where am seeing different behavior. Like total of 3 different services

  • First service will listen from Solace queue and produce it to kafka topic-1 (where transaction are enabled)
  • Second Service will listen from above kafka topic-1 and write it to another kafka topic-2 (where we have no manual commits, transactions enabled to produce to other topic, auto commit offset as false & isolation.level is set to read_commited) ago Delete
  • Third Service will listen from kafka topic-2 and write it back to Solace queue (where we have no manual commits, auto commit offset as false & isolation.level is set to read_commited).

Now the issue after I enabled transaction and isolation level at the second service am not able to read any messages, if I disabled the transaction in second service am able to read all the messages.

  • Can we have transactions enabled & isolation level in one single service
  • How it works if my service is just a producer or consumer (how EoS guaranteed for these services)

Edited: Below is how my yml looks

 - kafka:
   - binder:
     - transaction:
         - transaction-id-prefix:
       - brokers: 
         - configuration: 
               all my consumer properties (ssl, sasl)

Updated (yml with spring cloud):

spring: 
  cloud.stream:
      bindings:
        input:
          destination: test_input
          content-type: application/json
          group: test_group
        output:
          destination: test_output
          content-type: application/json
      kafka.binder: 
          configuration: 
            isolation.level: read_committed
            security.protocol: SASL_SSL
            sasl.mechanism: GSSAPI
            sasl.kerberos.service.name: kafka
            ssl.truststore.location: jks
            ssl.truststore.password: 
            ssl.endpoint.identification.algorithm: null            
          brokers: broker1:9092,broker2:9092,broker3:9092
          auto-create-topics: false
          transaction:
            transaction-id-prefix: trans-2
            producer:
              configuration:
                retries: 2000
                acks: all
                security.protocol: SASL_SSL
                sasl.mechanism: GSSAPI
                sasl.kerberos.service.name: kafka
                ssl.truststore.location: jks
                ssl.truststore.password: 
                ssl.endpoint.identification.algorithm: null

Updated (yml with spring kafka):

spring:
  kafka:
    bootstrap-servers: broker1:9092,broker2:9092,broker3:9092
    consumer:
      properties:
        isolation.level: read_committed
        ssl.truststore.location: truststore.jks
        ssl.truststore.password: 
        security.protocol: SASL_SSL
        sasl.mechanism: GSSAPI
        sasl.kerberos.service.name: kafka
    producer:
      transaction-id-prefix: trans-2
      retries: 2000
      acks: all
      properties:
        ssl.truststore.location: truststore.jks
        ssl.truststore.password: 
        security.protocol: SASL_SSL
        sasl.mechanism: GSSAPI
        sasl.kerberos.service.name: kafka
    admin:
      properties:
        ssl.truststore.location: truststore.jks
        ssl.truststore.password: 
        security.protocol: SASL_SSL
        sasl.mechanism: GSSAPI
        sasl.kerberos.service.name: kafka

Updated with dynamic destination

Caused by: java.lang.IllegalStateException: Cannot perform operation after producer has been closed
    at org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:810) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:819) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803) ~[kafka-clients-2.0.0.jar:na]
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:423) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:351) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:209) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
    at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:382) ~[spring-integration-kafka-3.1.0.RELEASE.jar:3.1.0.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123) [spring-integration-core-5.1.0.RELEASE.jar:5.1.0.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169) [spring-integration-core-5.1.0.RELEASE.jar:5.1.0.RELEASE]

tried both the approaches for dynamic destination resolver issue: dynamic destination resolver


Solution

  • It works fine for me; these are all in the same app, but that won't make a difference...

    @SpringBootApplication
    @EnableBinding(Channels.class)
    public class So55419549Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So55419549Application.class, args);
        }
    
        @Bean
        public IntegrationFlow service1(MessageChannel out1) {
            return IntegrationFlows.from(() -> "foo", e -> e
                        .poller(Pollers.fixedDelay(Duration.ofSeconds(5))))
                    .log(Level.INFO, m -> "s1 " + m.getPayload())
                    .channel(out1)
                    .get();
        }
    
        @StreamListener("in2")
        @SendTo("out2")
        public String service2(String in) {
            System.out.println("s2 " + in);
            return in.toUpperCase();
        }
    
        @StreamListener("in3")
        public void service3(String in) {
            System.out.println("s3 " + in);
        }
    
    }
    
    interface Channels {
    
        @Output
        MessageChannel out1();
    
        @Input
        MessageChannel in2();
    
        @Output
        MessageChannel out2();
    
        @Input
        MessageChannel in3();
    
    }
    

    and

    spring:
      cloud:
        stream:
          bindings:
            out1:
              destination: topic1
            in2:
              group: s2
              destination: topic1
            out2:
              destination: topic2
            in3:
              group: s3
              destination: topic2
          kafka:
            binder:
              transaction:
                transaction-id-prefix: tx
            bindings:
              in2:
                consumer:
                  configuration:
                    isolation:
                      level: read_committed
              in3:
                consumer:
                  configuration:
                    isolation:
                      level: read_committed
      kafka:
        producer:
          # needed again here so boot declares a TM for us
          transaction-id-prefix: tx
          retries: 10
          acks: all
    logging:
      level:
        org.springframework.kafka.transaction: debug
    

    and

    2019-03-29 12:57:08.345  INFO 75700 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler   
        : s1 foo
    2019-03-29 12:57:08.353 DEBUG 75700 --- [container-0-C-1] o.s.k.t.KafkaTransactionManager          : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
    2019-03-29 12:57:08.353 DEBUG 75700 --- [container-0-C-1] o.s.k.t.KafkaTransactionManager          : Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@6790c874, txId=txs2.topic1.0]]
    s2 foo
    2019-03-29 12:57:08.357 DEBUG 75700 --- [container-0-C-1] o.s.k.t.KafkaTransactionManager          : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
    2019-03-29 12:57:08.358 DEBUG 75700 --- [container-0-C-1] o.s.k.t.KafkaTransactionManager          : Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@820ef3d, txId=txs3.topic2.0]]
    s3 FOO
    

    EDIT

    The binder doesn't enable transaction synchronization on the transaction manager. As a workaround, add

    TransactionSynchronizationManager.setActualTransactionActive(true);
    

    to your @StreamListener.

    I opened a bug against the binder.