Search code examples
javaapache-kafkakafka-producer-apitestcontainers

Producer#initTransactions doesn't work with KafkaContainer


I try to send messages to Kafka with a transaction. So, I use this code:

 try (Producer<Void, String> producer = createProducer(kafkaContainerBootstrapServers)) {
            producer.initTransactions();
            producer.beginTransaction();
            Arrays.stream(messages).forEach(
                message -> producer.send(new ProducerRecord<>(KAFKA_INPUT_TOPIC, message)));
            producer.commitTransaction();
        }

...

private static Producer<Void, String> createProducer(String kafkaContainerBootstrapServers) {
        return new KafkaProducer<>(
            ImmutableMap.of(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainerBootstrapServers,
                ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString(),
                ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true,
                ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString()
            ),
            new VoidSerializer(),
            new StringSerializer());
    }

If I use local Kafka, it works well.

But if I use Kafka TestContainers, it freezes on producer.initTransactions():

private static final String KAFKA_VERSION = "4.1.1";

@Rule
public KafkaContainer kafka = new KafkaContainer(KAFKA_VERSION)
    .withEmbeddedZookeeper();

How can I configure KafkaContainer to work with transactions?


Solution

  • Try using Kafka for JUnit instead of Kafka testcontainers. I had the same problem with transactions and made them alive in this way.

    Maven dependency that I used:

    <dependency>
        <groupId>net.mguenther.kafka</groupId>
        <artifactId>kafka-junit</artifactId>
        <version>2.1.0</version>
        <scope>test</scope>
    </dependency>