Search code examples
javaspring-bootapache-kafkaspring-kafka

Write to two Kafka topics in a single transaction using Spring Kafka


I'm trying to work out if there's a way of using Kafka's transaction feature to write to two topics within a transaction.

I know the typical scenario to use Kafka's transactions is in a consumer-producer pattern and that seems well documented.

What I've tried:

  1. created a KafkaTransactionManager per topic
  2. configured each ProducerFactory to use their respective transaction manager
  3. Created a ChainedTransactionManger with the two instances of KafkaTransactionManager
  4. Created a KafkaTemplate per topic

    I then used the @Transactional(transactionManager = "chainedTx") annotation on a method that that does:

    template1.send("topic1", "example payload");
    template2.send("topic2", "example payload");
    

This doesn't work. The KafkaTemplate is transactional, but when the send() method is called, there is no transaction in progress and I get an IllegalStateException.

I was going to try the KafkaTemplate.executeInTransaction() method, but the Javadoc states this is only for local transactions, so it does not appear to fit my needs.

My next step is to try using Kafka's Producer API directly to see if this pattern works, but I'd appreciate it if someone can tell me know that I'm wasting my time and Kafka doesn't support transactionally writing to multiple topics.

I did find this statement in Confluent's blog on Kafka transaction support:

Transactions enable atomic writes to multiple Kafka topics and partitions...

But I haven't found any examples that demonstrate it.

Configuration of the first producer

@Configuration public class ControlProducerConfig {

@Bean("controlTransactionManager")
KafkaTransactionManager<String, String> transactionManager() {
    return  new KafkaTransactionManager<>(factory());
}

@Bean("controlTemplate")
public KafkaTemplate<String, String> template() {
    return new KafkaTemplate<>(factory());
}

private ProducerFactory<String, String> factory() {
    DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(config());
    factory.setTransactionIdPrefix("abcd");
    return factory;
}

private Map<String, Object> config() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx.xxx.xxx.xxx");

    props.put("schema.registry.url", "http://xxx.xxx.xxx.xxx/");

    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

    // you can't set idempotence without setting max in flight requests to <= 5
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "1234");

    return props;
}

}

Configuration of the second producer

@Configuration
public class PayloadProducerConfig {


@Bean("payloadTransactionManager")
KafkaTransactionManager<String, String> transactionManager() {
    return new KafkaTransactionManager<>(factory());
}

@Bean("payloadTemplate")
public KafkaTemplate<String, String> template() {
    return new KafkaTemplate<>(factory());
}

private ProducerFactory<String, String> factory() {
    DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(config());
    factory.setTransactionIdPrefix("abcd");
    return factory;
}

private Map<String, Object> config() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx.xxx.xxx.xxx");

    props.put("schema.registry.url", "http://xxx.xxx.xxx.xxx/");

    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

    // you can't set idempotence without setting max in flight requests to <= 5
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "1234");

    return props;
}

}

Main class

@EnableTransactionManagement
@SpringBootApplication
public class App {

public static void main(String[] args) {
    SpringApplication.run(App.class, args);
}

@Bean("chainedTx")
public ChainedTransactionManager chained(
    @Qualifier("controlTransactionManager") KafkaTransactionManager controlTransactionManager,
    @Qualifier("payloadTransactionManager") KafkaTransactionManager payloadTransactionManager) {

    return new ChainedTransactionManager(controlTransactionManager, payloadTransactionManager);
}

@Bean OnStart onStart(PostTwoMessages postTwoMessages) {
    return new OnStart(postTwoMessages);
}

@Bean
public PostTwoMessages postTwoMessages(
    @Qualifier("controlTemplate") KafkaTemplate<String, String> controlTemplate,
    @Qualifier("controlTemplate") KafkaTemplate<String, String> payloadTemplate) {

    return new PostTwoMessages(controlTemplate, payloadTemplate);
}

}

On application start

public class OnStart implements ApplicationListener<ApplicationReadyEvent> {

private PostTwoMessages postTwoMessages;

public OnStart(PostTwoMessages postTwoMessages) {
    this.postTwoMessages = postTwoMessages;
}

@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
    postTwoMessages.run();
}

}

Posting the two messages

public class PostTwoMessages  {

private final KafkaTemplate<String, String> controlTemplate;
private final KafkaTemplate<String, String> payloadTemplate;

public PostTwoMessages(
    @Qualifier("controlTemplate") KafkaTemplate<String, String> controlTemplate,
    @Qualifier("payloadTemplate") KafkaTemplate<String, String> payloadTemplate) {

    this.controlTemplate = controlTemplate;
    this.payloadTemplate = payloadTemplate;
}

@Transactional(transactionManager = "chainedTx")
public void run() {
    UUID uuid = UUID.randomUUID();
    controlTemplate.send("private.s0869y.trx.model3a", "control: " + uuid);
    payloadTemplate.send("private.s0869y.trx.model3b", "payload: " + uuid);
}

}


Solution

  • It should work; do you have @EnableTransactionManagement?

    However, transactions can't span 2 different producers; you have to do both sends using the same template. Otherwise it's 2 different transactions.

    EDIT

    Here's an example with a Spring Boot application:

    EDIT2

    Update example to show using a local transaction via executeInTransaction.

    @SpringBootApplication
    public class So54865968Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So54865968Application.class, args);
        }
    
        @Bean
        public ApplicationRunner runner(Foo foo) {
            return args -> {
                foo.runInTx();
                System.out.println("Committed 1");
                foo.runInLocalTx();
                System.out.println("Committed 2");
            };
        }
    
        @Bean
        public Foo foo(KafkaTemplate<String, Object> template) {
            return new Foo(template);
        }
    
        @Bean
        public Bar bar() {
            return new Bar();
        }
    
        @Bean
        public NewTopic topic1() {
            return new NewTopic("so54865968-1", 1, (short) 1);
        }
    
        @Bean
        public NewTopic topic2() {
            return new NewTopic("so54865968-2", 1, (short) 1);
        }
    
        public static class Foo {
    
            private final KafkaTemplate<String, Object> template;
    
            public Foo(KafkaTemplate<String, Object> template) {
                this.template = template;
            }
    
            @Transactional(transactionManager = "kafkaTransactionManager")
            public void runInTx() throws InterruptedException {
                this.template.send("so54865968-1", 42);
                this.template.send("so54865968-2", "texttest");
                System.out.println("Sent 2; waiting a few seconds to commit");
                Thread.sleep(5_000);
            }
    
            public void runInLocalTx() throws InterruptedException {
                this.template.executeInTransaction(t -> {
                    t.send("so54865968-1", 43);
                    t.send("so54865968-2", "texttest2");
                    System.out.println("Sent 2; waiting a few seconds to commit");
                    try {
                        Thread.sleep(5_000);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    return true;
                });
            }
    
        }
    
        public static class Bar {
    
            @KafkaListener(id = "foo", topics = { "so54865968-1", "so54865968-2" })
            public void haandler(byte[] bytes) {
                if (bytes.length == 4) {
                    ByteBuffer bb = ByteBuffer.wrap(bytes);
                    System.out.println("Received int " + bb.getInt());
                }
                else {
                    System.out.println("Received string " + new String(bytes));
                }
            }
    
        }
    
    }
    

    and

    spring.kafka.producer.transaction-id-prefix=tx-id
    spring.kafka.producer.properties.value.serializer=com.example.CompositeSerializer
    
    spring.kafka.consumer.enable-auto-commit=false
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.properties.isolation.level=read_committed
    spring.kafka.consumer.properties.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
    

    and

    public class CompositeSerializer implements Serializer<Object> {
    
        private final StringSerializer stringSerializer = new StringSerializer();
    
        private final IntegerSerializer intSerializer = new IntegerSerializer();
    
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
        }
    
        @Override
        public byte[] serialize(String topic, Object data) {
            return data instanceof Integer ? intSerializer.serialize(topic, (Integer) data)
                    : stringSerializer.serialize(topic, (String) data);
        }
    
        @Override
        public void close() {
        }
    
    }
    

    and

    Received int 42
    Received string texttest
    

    Both showed up after the 5 second pause.