Search code examples
dockerapache-kafkaspring-integrationspocktestcontainers

Create topic in KafkaContainer for Integration Test


I'm trying to create a topic in my KafkaContainer using @ServiceConnection - when running the application I get an error message when I'm trying to produce to the topic saying Topic myTopic not present in metadata after 1000 ms.

Also seeing: Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected Which makes me think my @ServiceConnection isn't setup correctly.

I am using the @Testcontainers to spin up an SFTP instance, to grab the mounted file. That file then goes through integration flows, until it is produced onto the topic.

Code:

@Testcontainers
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
@ActiveProfiles("test")
class FullIntegrationTest extends Specification {

    @Bean
    @ServiceConnection
    KafkaContainer kafkaContainer() {
        return new KafkaContainer(DockerImageName.parse('confluentinc/cp-kafka:latest'))
                .withEnv("TOPIC_AUTO_CREATE", "true")
    }

I can provide more information as needed. If this is not the correct way of configuring the topics I was wondering if someone can provide me with an example. I have also been following this article, but still couldn't get it configure the new topic.

EDIT: application-test.properties

spring.kafka.properties.schema.registry.url                  = mock://notused
spring.kafka.bootstrap-servers                               = localhost:9092
spring.kafka.properties.sasl.mechanism                       = ""
spring.kafka.properties.security.protocol                    = PLAINTEXT
spring.kafka.properties.basic.auth.credentials.source        = ""

spring.kafka.properties.schema.registry.basic.auth.user.info = ""
spring.kafka.properties.sasl.jaas.config                     = ""

Solution

  • I had to remove the @ServiceConnection and added Zookeeper, with the env variables.

        @Shared
        static final KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse('confluentinc/cp-kafka:latest'))
                .withEmbeddedZookeeper()
                .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true")
                .withEnv("KAFKA_CREATE_TOPICS", Constants.KAFKA_TOPIC + ":1:1")
    
        @DynamicPropertySource
        static void dynamicProperties(DynamicPropertyRegistry registry) {
            registry.add("spring.kafka.bootstrap-servers", () -> kafkaContainer.getBootstrapServers())
            registry.add("engage.server", () -> sftpDockerContainer.getServiceHost("sftp_server", 22))
            registry.add("engage.port", () -> sftpDockerContainer.getServicePort("sftp_server", 22))
        }