Search code examples
gradleapache-kafkaquarkustestcontainerssmallrye-reactive-messaging

Quarkus integration tests are not picking test properties


I have a quarkus application which consumes from kafka topic, I have written kafka consumer integration tests using testContainers. but I am facing issue while running the tests, it picks the real brokers url which are saved in my .zshrc file for kafka-bootstrap-servers property not the localhost, even though I defined the test properties.

This is the application properties.

kafka:
  bootstrap:
    servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}

  security:
    protocol: SASL_SSL
  sasl:
    mechanism: SCRAM-SHA-512
  jaas:
    config: org.apache.kafka.common.security.scram.ScramLoginModule required username="${KAFKA_CONSUMER_USERNAME}"
    password="${KAFKA_CONSUMER_PASSWORD}";


"%test":
   kafka:
     bootstrap:
       servers: kafka:9092
     security:
       protocol: PLAINTEXT
     sasl:
       mechanism: PLAIN

I have used QuarkusTestResourceLifecycleManager to set the kafka property as well.

public class KafkaResource implements QuarkusTestResourceLifecycleManager {


    private static KafkaContainer kafkaContainer;

    private static String bootstrapServers;

    @Override
    public Map<String, String> start() {
   
        kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"))
                .withKraft();
        kafkaContainer.start();
        bootstrapServers = kafkaContainer.getBootstrapServers();
        return Map.of("kafka.bootstrap.servers", 
        kafkaContainer.getBootstrapServers());
   
        return Collections.emptyMap();
    }

    @Override
    public void stop() {
        kafkaContainer.stop();
    }

    public static String getBootstrapServers() {
       return bootstrapServers;
    }

}

consumer integration tests:

@QuarkusTest
@QuarkusTestResource(KafkaResource.class)
class ConsumerTest {

  @InjectSpy
  EventConsumer eventConsumer;

  @Test
  void testConsumer() throws InvalidProtocolBufferException, InterruptedException {
    String eventId = "sport1234";
    Event event = Event.newBuilder().setEventId(eventId)
            .build();
    produceTestMessage(event);

    Mockito.verify(eventConsumer, timeout(5000)).consumeState(event.toByteArray());
    System.out.println("Number of invocations: " + Mockito.mockingDetails(eventConsumer).getInvocations().size());
  }

  private void produceTestMessage(Event event) {
    // Create a Kafka producer configuration
    String kafkaBootstrapServers = KafkaResource.getBootstrapServers();
    Properties producerProps = new Properties();
    producerProps.put("bootstrap.servers", kafkaBootstrapServers);
    producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.put("value.serializer", "com.org.events.EventProtoKafkaSerializer");

    try (Producer<String, Event> producer = new KafkaProducer<>(producerProps)) {
        ProducerRecord<String, Event> record = new ProducerRecord<>("test_topic", event.getEventId(), event);
        producer.send(record).get(); // Wait for the message to be sent
    } catch (Exception e) {
        throw new RuntimeException("Error producing test message", e);
    }
 }

}

dependencies:

implementation 'io.quarkus:quarkus-smallrye-reactive-messaging-kafka'
testImplementation 'io.quarkus:quarkus-junit5'
testImplementation 'io.quarkus:quarkus-junit5-mockito'
testImplementation 'io.rest-assured:rest-assured'
testImplementation 'io.quarkus:quarkus-jacoco'
testImplementation 'org.assertj:assertj-core:3.24.2'
testImplementation 'org.testcontainers:junit-jupiter:1.19.3'
testImplementation 'org.testcontainers:mysql:1.19.3'
testImplementation 'org.testcontainers:kafka:1.19.3'

Now my consumer integration tests works fine if I remove KAFKA_BOOTSTRAP_SERVERS property from .zshrc file. but when I keep it tests tries to connect to dev server not the localhost which I defined in test profile and setting in KafkaResource file as well.

What am I missing??


Solution

  • I tried removing all the kafka related properties from application properties file but still It was picking the brokers from KAFKA_BOOTSTRAP_SERVERS key in .zshrc file.

    Then I changed the KAFKA_BOOTSTRAP_SERVERS to KAFKA_BOOTSTRAP_SERVERS_DEV then all tests passed.

    I think it was converting KAFKA_BOOTSTRAP_SERVERS to kafka.bootstrap.servers which is kafka's real property to set the brokers in tests and system properties have higher precedence over others.

    So changing the variable solved my issue.