I have a quarkus application which consumes from kafka topic, I have written kafka consumer integration tests using testContainers. but I am facing issue while running the tests, it picks the real brokers url which are saved in my .zshrc file for kafka-bootstrap-servers property not the localhost, even though I defined the test properties.
This is the application properties.
kafka:
bootstrap:
servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
security:
protocol: SASL_SSL
sasl:
mechanism: SCRAM-SHA-512
jaas:
config: org.apache.kafka.common.security.scram.ScramLoginModule required username="${KAFKA_CONSUMER_USERNAME}"
password="${KAFKA_CONSUMER_PASSWORD}";
"%test":
kafka:
bootstrap:
servers: kafka:9092
security:
protocol: PLAINTEXT
sasl:
mechanism: PLAIN
I have used QuarkusTestResourceLifecycleManager to set the kafka property as well.
public class KafkaResource implements QuarkusTestResourceLifecycleManager {
private static KafkaContainer kafkaContainer;
private static String bootstrapServers;
@Override
public Map<String, String> start() {
kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"))
.withKraft();
kafkaContainer.start();
bootstrapServers = kafkaContainer.getBootstrapServers();
return Map.of("kafka.bootstrap.servers",
kafkaContainer.getBootstrapServers());
return Collections.emptyMap();
}
@Override
public void stop() {
kafkaContainer.stop();
}
public static String getBootstrapServers() {
return bootstrapServers;
}
}
consumer integration tests:
@QuarkusTest
@QuarkusTestResource(KafkaResource.class)
class ConsumerTest {
@InjectSpy
EventConsumer eventConsumer;
@Test
void testConsumer() throws InvalidProtocolBufferException, InterruptedException {
String eventId = "sport1234";
Event event = Event.newBuilder().setEventId(eventId)
.build();
produceTestMessage(event);
Mockito.verify(eventConsumer, timeout(5000)).consumeState(event.toByteArray());
System.out.println("Number of invocations: " + Mockito.mockingDetails(eventConsumer).getInvocations().size());
}
private void produceTestMessage(Event event) {
// Create a Kafka producer configuration
String kafkaBootstrapServers = KafkaResource.getBootstrapServers();
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", kafkaBootstrapServers);
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "com.org.events.EventProtoKafkaSerializer");
try (Producer<String, Event> producer = new KafkaProducer<>(producerProps)) {
ProducerRecord<String, Event> record = new ProducerRecord<>("test_topic", event.getEventId(), event);
producer.send(record).get(); // Wait for the message to be sent
} catch (Exception e) {
throw new RuntimeException("Error producing test message", e);
}
}
}
dependencies:
implementation 'io.quarkus:quarkus-smallrye-reactive-messaging-kafka'
testImplementation 'io.quarkus:quarkus-junit5'
testImplementation 'io.quarkus:quarkus-junit5-mockito'
testImplementation 'io.rest-assured:rest-assured'
testImplementation 'io.quarkus:quarkus-jacoco'
testImplementation 'org.assertj:assertj-core:3.24.2'
testImplementation 'org.testcontainers:junit-jupiter:1.19.3'
testImplementation 'org.testcontainers:mysql:1.19.3'
testImplementation 'org.testcontainers:kafka:1.19.3'
Now my consumer integration tests works fine if I remove KAFKA_BOOTSTRAP_SERVERS property from .zshrc file. but when I keep it tests tries to connect to dev server not the localhost which I defined in test profile and setting in KafkaResource file as well.
What am I missing??
I tried removing all the kafka related properties from application properties file but still It was picking the brokers from KAFKA_BOOTSTRAP_SERVERS key in .zshrc file.
Then I changed the KAFKA_BOOTSTRAP_SERVERS to KAFKA_BOOTSTRAP_SERVERS_DEV then all tests passed.
I think it was converting KAFKA_BOOTSTRAP_SERVERS to kafka.bootstrap.servers which is kafka's real property to set the brokers in tests and system properties have higher precedence over others.
So changing the variable solved my issue.