Search code examples
springapache-kafkaspring-cloudspring-cloud-stream

Why isn't the KafkaTransactionManager being applied to this Spring Cloud Stream Kafka Producer?


I have configured a Spring Cloud Stream Kafka application to use transactions (full source code available on Github):

spring:
  application:
    name: message-relay-service
  cloud:
    stream:
      kafka:
        binder:
          transaction:
            transaction-id-prefix: message-relay-tx-
            producer:
              configuration:
                retries: 1
                acks: all
                key:
                  serializer: org.apache.kafka.common.serialization.StringSerializer

      bindings:
        output:
          destination: transfer
          contentType: application/*+avro
      schema-registry-client:
        endpoint: http://localhost:8081
      schema:
        avro:
          subjectNamingStrategy: org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy
  datasource:
    url: jdbc:h2:tcp://localhost:9090/mem:mydb
    driver-class-name: org.h2.Driver
    username: sa
    password:
  jpa:
    hibernate:
      ddl-auto: create
    database-platform: org.hibernate.dialect.H2Dialect

server:
  port: 8085

This app has a scheduled task that periodically checks records to send in a database using a @Scheduled task. This methods is annotated with @Transactional and the main class defines @EnableTransactionManagement.

However when debugging the code I've realized that the KafkaTransactionManager isn't being executed, that is to say, there are no Kafka transactions in place. What's the problem?

@EnableTransactionManagement
@EnableBinding(Source::class)
@EnableScheduling
@SpringBootApplication
class MessageRelayServiceApplication

fun main(args: Array<String>) {
    runApplication<MessageRelayServiceApplication>(*args)
}

---

@Component
class MessageRelay(private val outboxService: OutboxService,
                   private val source: Source) {

    @Transactional
    @Scheduled(fixedDelay = 10000)
    fun checkOutbox() {
        val pending = outboxService.getPending()
        pending.forEach {
            val message = MessageBuilder.withPayload(it.payload)
                    .setHeader(KafkaHeaders.MESSAGE_KEY, it.messageKey)
                    .setHeader(MessageHeaders.CONTENT_TYPE, it.contentType)
                    .build()
            source.output().send(message)
            outboxService.markAsProcessed(it.id)
        }
    }

}


Solution

  • I don't see @EnableTransactionManagement in account-service, only in message-relay-service.

    In any case, your scenario is not currently supported; the transactional binder was designed for processors where the consumer starts the transaction, any records sent on the consumer thread participate in that transaction, the consumer sends the offset to the transaction and then commits the transaction.

    It was not designed for producer-only bindings; please open a GitHub issue against the binder because it should be supported.

    I am not sure why you are not seeing a transaction starting but, even if it does, the problem is that @Transactional will use Boot's auto-configured KTM (and producer factory) and the binding uses a different producer factory (the one from your configuration).

    Even if a transaction is in process, the producer won't participate in it.