Search code examples
javaspring-bootintegration-testingspring-kafkatestcontainers

Integration tests with Kafka and Testcontainers


I am developing some integration tests for my application, and one of the flows I'm testing uses Kafka to notify some client about the updates. I have created a KafkaContainer:


import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

import java.time.Duration;

public class MyKafkaContainer extends KafkaContainer {

    private static final String KAFKA_DOCKER_IMAGE_NAME = "my_image_name";

    private static KafkaContainer kafkaContainer;

    private MyKafkaContainer() {
        super(DockerImageName.parse(KAFKA_DOCKER_IMAGE_NAME));
    }

    public static KafkaContainer getInstance() {
        if (kafkaContainer == null) {
            kafkaContainer = new KafkaContainer(DockerImageName.parse(KAFKA_DOCKER_IMAGE_NAME).asCompatibleSubstituteFor("confluentinc/cp-kafka"))
                    .withStartupAttempts(3)
                    .withStartupTimeout(Duration.ofMinutes(3));
        }
        return kafkaContainer;
    }

    @Override
    public void start() {
        super.start();
    }

    @Override
    public void stop() {
        //do nothing, JVM handles shut down
    }
}

After this, I created a initializer class that I used to set the "bootstrap-servers" value as follows:

public class TestContainersInitializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {


    private static final KafkaContainer kafkaContainer;


    static {

        kafkaContainer = MyKafkaContainer.getInstance();


        Startables.deepStart(kafkaContainer).join();
    }

    @Override
    public void initialize(ConfigurableApplicationContext applicationContext) {
        TestPropertyValues.of(
                "spring.kafka.bootstrap-servers=" + kafkaContainer.getBootstrapServers(),
        ).applyTo(applicationContext.getEnvironment());
    }
}

Then I used this class to annotate my test class:

@ContextConfiguration(initializers = TestContainersInitializer.class)

Everything starts up nicely, until the moment my code flow wants to send a message through kafka, then I'm am greeted with this error, when instantiating the producer:

org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user.

Do you have any ideas what this could be about? Should I provide some password in the way it is provided in the real application flow?

I have tried playing around with the keyTab and the principal values, since those are set in the application.properties, but since we use the TestContainer bootstrap server, I don't think I should use any of that. Other than this, I had no other ideas.


Solution

  • So I have found out what the problem was, writing this so maybe it will help someone else. In my application's application.properties file the spring.kafka.security.protocol is set to SASL_PLAINTEXT. After taking a look on this website I found, this option requires some authetication, that is not normaly built in TestContainers.

    Basically the fix was to override this in my TestContainersInitializer.initialize() method with: "spring.kafka.security.protocol=PLAINTEXT"