I am developing some integration tests for my application, and one of the flows I'm testing uses Kafka to notify some client about the updates. I have created a KafkaContainer:
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
import java.time.Duration;
public class MyKafkaContainer extends KafkaContainer {
private static final String KAFKA_DOCKER_IMAGE_NAME = "my_image_name";
private static KafkaContainer kafkaContainer;
private MyKafkaContainer() {
super(DockerImageName.parse(KAFKA_DOCKER_IMAGE_NAME));
}
public static KafkaContainer getInstance() {
if (kafkaContainer == null) {
kafkaContainer = new KafkaContainer(DockerImageName.parse(KAFKA_DOCKER_IMAGE_NAME).asCompatibleSubstituteFor("confluentinc/cp-kafka"))
.withStartupAttempts(3)
.withStartupTimeout(Duration.ofMinutes(3));
}
return kafkaContainer;
}
@Override
public void start() {
super.start();
}
@Override
public void stop() {
//do nothing, JVM handles shut down
}
}
After this, I created a initializer class that I used to set the "bootstrap-servers" value as follows:
public class TestContainersInitializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
private static final KafkaContainer kafkaContainer;
static {
kafkaContainer = MyKafkaContainer.getInstance();
Startables.deepStart(kafkaContainer).join();
}
@Override
public void initialize(ConfigurableApplicationContext applicationContext) {
TestPropertyValues.of(
"spring.kafka.bootstrap-servers=" + kafkaContainer.getBootstrapServers(),
).applyTo(applicationContext.getEnvironment());
}
}
Then I used this class to annotate my test class:
@ContextConfiguration(initializers = TestContainersInitializer.class)
Everything starts up nicely, until the moment my code flow wants to send a message through kafka, then I'm am greeted with this error, when instantiating the producer:
org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user.
Do you have any ideas what this could be about? Should I provide some password in the way it is provided in the real application flow?
I have tried playing around with the keyTab and the principal values, since those are set in the application.properties, but since we use the TestContainer bootstrap server, I don't think I should use any of that. Other than this, I had no other ideas.
So I have found out what the problem was, writing this so maybe it will help someone else. In my application's application.properties file the spring.kafka.security.protocol is set to SASL_PLAINTEXT. After taking a look on this website I found, this option requires some authetication, that is not normaly built in TestContainers.
Basically the fix was to override this in my TestContainersInitializer.initialize() method with: "spring.kafka.security.protocol=PLAINTEXT"