Search code examples
spring-boottestingintegration-testingspring-cloud-streamembedded-kafka

Spring Cloud Stream Application not connecting to EmbeddedKafka when testing


Attempting to test a basic Spring Cloud Stream application, the EmbeddedKafka broker never get the message published when running the tests.

Here is a github repo for my current code : https://github.com/gjobin/spring-cloud-stream-test

When running the application, I can see the messages published to a local or remote borker and also the Consumer output shows details about Kafka headers.

------ Consumed Record ------
Person(firstname=fn, lastname=LN)
deliveryAttempt : 1
kafka_timestampType : CREATE_TIME
kafka_receivedTopic : sink-in-0
kafka_offset : 46
scst_nativeHeadersPresent : true
kafka_consumer : org.apache.kafka.clients.consumer.KafkaConsumer@2994db0
source-type : kafka
id : 97a186be-5fcb-cc47-26a1-46b9895f6b32
kafka_receivedPartitionId : 0
contentType : application/json
kafka_receivedTimestamp : 1694174130515
kafka_groupId : anonymous.754c77d9-b7a7-4221-9132-34b5c347e299
timestamp : 1694174130552

However, whenever I run the app through @SpringBootTest, the output becomes more generic to SCS and ommits any Kafka headers

------ Consumed Record ------
Person(firstname=fn, lastname=LN)
source-type : kafka
id : 420355ec-2875-92da-cd83-d0d577142ec3
contentType : application/json
timestamp : 1694122858406

Even running a test suite without the @EmbeddedKafka still starts the app and produces similar messages, when I would expect the app to crash saying no broker was found, similar to when I run the application itself without running local kafka first.

package com.example.stream;

import ...

/**
 * IT tests without any Kafka
 */
@SpringBootTest(classes = Application.class)
@ActiveProfiles(profiles = {"default", "test"})
@Slf4j
class PersonStreamWithoutKafka {

    @Test
    public void testAppStartsWithoutKafka() throws InterruptedException {
        Thread.sleep(5000);
        //THIS SHOULD NOT PROCESS MESSAGE, KAFKA IS NOT RUNNING.
    }
}

Here is my test class attempting to validate a message was emitted by the producer, but never receives any message.

package com.example.stream;

import ...

/**
 * IT tests using EmbeddedKafka
 */
@SpringBootTest(classes = Application.class)
@ActiveProfiles(profiles = {"default", "test"})
@EmbeddedKafka
@Slf4j
class PersonStreamIT {

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;


    @Test
    public void testAppProducer() {
        //Assert message
        Map<String, Object> consumerConfigs = new HashMap<>(KafkaTestUtils.consumerProps("test", "false", embeddedKafkaBroker));
        try (JsonDeserializer<Person> jsonDeserializer = new JsonDeserializer<Person>().trustedPackages("*");
             Consumer<String, Person> consumer = new DefaultKafkaConsumerFactory<>(consumerConfigs, new StringDeserializer(), jsonDeserializer).createConsumer()) {
            consumer.subscribe(List.of("source-out-0"));
            ConsumerRecord<String, Person> record = KafkaTestUtils.getSingleRecord(consumer, "source-out-0", Duration.ofSeconds(2));
            Assertions.assertThat(record.value()).isEqualTo(Person.builder().firstname("ln").lastname("fn").build());
        }
    }
}

For reference, here is the application.yml configuration and implementation

spring:
  config:
    activate:
      on-profile: default

  cloud:
    function:
      definition: source;uppercase;sink

    stream:
      bindings:
        uppercase-in-0:
          destination: source-out-0
        uppercase-out-0:
          destination: sink-in-0
      kafka:
        binder:
          brokers: localhost:9092

---
spring:
  config:
    activate:
      on-profile: test

  cloud:
    stream:
      kafka:
        binder:
          brokers: ${spring.embedded.kafka.brokers}
package com.example.stream;

import ...

@Service
@Slf4j
@EnableAutoConfiguration
public class PersonStream {

    @Bean
    public Supplier<Person> source() {
        return () -> Person.builder()
                .firstname("fn")
                .lastname("ln")
                .build();
    }

    @Bean
    public Function<Person, Person> uppercase() {
        return p -> {
            p.setLastname(p.getLastname().toUpperCase());
            return p;
        };
    }

    @Bean
    public Consumer<Message<Person>> sink() {
        return m -> {
            log.info("------ Consumed Record ------");
            log.info(m.getPayload().toString());
            m.getHeaders().forEach((key, value) -> log.info("{} : {}", key, value.toString()));
        };
    }
}

Thank you very much for your help.


Solution

  • Not sure if that is what you would expected according to a number of your various tests, but another solution is to suppress Test binder:

    spring:
      cloud:
        stream:
          default-binder: kafka
    

    With this I see in the logs:

    2023-09-12T16:18:42.347-04:00  INFO 18544 --- [container-0-C-1] com.example.stream.PersonStream          : ------ Consumed Record ------
    2023-09-12T16:18:42.347-04:00  INFO 18544 --- [container-0-C-1] com.example.stream.PersonStream          : Person(firstname=fn, lastname=LN)
    2023-09-12T16:18:42.348-04:00  INFO 18544 --- [container-0-C-1] com.example.stream.PersonStream          : deliveryAttempt : 1
    2023-09-12T16:18:42.348-04:00  INFO 18544 --- [quest-handler-0] k.coordinator.group.GroupCoordinator     : [GroupCoordinator 0]: Group test with generation 2 is now empty (__consumer_offsets-3)
    2023-09-12T16:18:42.348-04:00  INFO 18544 --- [container-0-C-1] com.example.stream.PersonStream          : kafka_timestampType : CREATE_TIME
    2023-09-12T16:18:42.348-04:00  INFO 18544 --- [container-0-C-1] com.example.stream.PersonStream          : kafka_receivedTopic : sink-in-0
    2023-09-12T16:18:42.348-04:00  INFO 18544 --- [container-0-C-1] com.example.stream.PersonStream          : kafka_offset : 0
    2023-09-12T16:18:42.348-04:00  INFO 18544 --- [container-0-C-1] com.example.stream.PersonStream          : scst_nativeHeadersPresent : true
    2023-09-12T16:18:42.348-04:00  INFO 18544 --- [container-0-C-1] com.example.stream.PersonStream          : kafka_consumer : org.apache.kafka.clients.consumer.KafkaConsumer@2f8f1eb7
    2023-09-12T16:18:42.348-04:00  INFO 18544 --- [container-0-C-1] com.example.stream.PersonStream          : source-type : kafka
    2023-09-12T16:18:42.348-04:00  INFO 18544 --- [container-0-C-1] com.example.stream.PersonStream          : id : 6b38368c-392b-d808-51e8-45f70fa78ce6
    2023-09-12T16:18:42.348-04:00  INFO 18544 --- [container-0-C-1] com.example.stream.PersonStream          : kafka_receivedPartitionId : 0
    2023-09-12T16:18:42.348-04:00  INFO 18544 --- [container-0-C-1] com.example.stream.PersonStream          : contentType : application/json
    2023-09-12T16:18:42.348-04:00  INFO 18544 --- [container-0-C-1] com.example.stream.PersonStream          : kafka_receivedTimestamp : 1694549922341
    2023-09-12T16:18:42.348-04:00  INFO 18544 --- [container-0-C-1] com.example.stream.PersonStream          : kafka_groupId : anonymous.3690f709-b5c8-40b7-9a17-57ad6b71aadb
    2023-09-12T16:18:42.348-04:00  INFO 18544 --- [container-0-C-1] com.example.stream.PersonStream          : timestamp : 1694549922347
    

    Unlike without that property:

    2023-09-12T16:23:25.764-04:00  INFO 24564 --- [   scheduling-1] com.example.stream.PersonStream          : ------ Consumed Record ------
    2023-09-12T16:23:25.764-04:00  INFO 24564 --- [   scheduling-1] com.example.stream.PersonStream          : Person(firstname=fn, lastname=LN)
    2023-09-12T16:23:25.764-04:00  INFO 24564 --- [   scheduling-1] com.example.stream.PersonStream          : source-type : kafka
    2023-09-12T16:23:25.764-04:00  INFO 24564 --- [   scheduling-1] com.example.stream.PersonStream          : id : 541bc466-613e-64f3-ff08-5badf8d35d87
    2023-09-12T16:23:25.764-04:00  INFO 24564 --- [   scheduling-1] com.example.stream.PersonStream          : contentType : application/json
    2023-09-12T16:23:25.764-04:00  INFO 24564 --- [   scheduling-1] com.example.stream.PersonStream          : timestamp : 1694550205764
    

    Test are still failing though, but that should be a fully different story.