I'm trying to create a topic in my KafkaContainer using @ServiceConnection
- when running the application I get an error message when I'm trying to produce to the topic saying Topic myTopic not present in metadata after 1000 ms.
Also seeing: Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
Which makes me think my @ServiceConnection
isn't setup correctly.
I am using the @Testcontainers
to spin up an SFTP instance, to grab the mounted file. That file then goes through integration flows, until it is produced onto the topic.
Code:
@Testcontainers
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
@ActiveProfiles("test")
class FullIntegrationTest extends Specification {
@Bean
@ServiceConnection
KafkaContainer kafkaContainer() {
return new KafkaContainer(DockerImageName.parse('confluentinc/cp-kafka:latest'))
.withEnv("TOPIC_AUTO_CREATE", "true")
}
I can provide more information as needed. If this is not the correct way of configuring the topics I was wondering if someone can provide me with an example. I have also been following this article, but still couldn't get it configure the new topic.
EDIT: application-test.properties
spring.kafka.properties.schema.registry.url = mock://notused
spring.kafka.bootstrap-servers = localhost:9092
spring.kafka.properties.sasl.mechanism = ""
spring.kafka.properties.security.protocol = PLAINTEXT
spring.kafka.properties.basic.auth.credentials.source = ""
spring.kafka.properties.schema.registry.basic.auth.user.info = ""
spring.kafka.properties.sasl.jaas.config = ""
I had to remove the @ServiceConnection
and added Zookeeper, with the env variables.
@Shared
static final KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse('confluentinc/cp-kafka:latest'))
.withEmbeddedZookeeper()
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true")
.withEnv("KAFKA_CREATE_TOPICS", Constants.KAFKA_TOPIC + ":1:1")
@DynamicPropertySource
static void dynamicProperties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", () -> kafkaContainer.getBootstrapServers())
registry.add("engage.server", () -> sftpDockerContainer.getServiceHost("sftp_server", 22))
registry.add("engage.port", () -> sftpDockerContainer.getServicePort("sftp_server", 22))
}