Search code examples

How to do integration testing of KStream topology using spring-boot EmbeddedKafka?

I have a simple spring-boot KStream topology that transforms a string from lowercase to uppercase. I want my integration test to launch an embedded kafka, and then test the topology. I would like to know if it possible to write integration tests like these using spring @EmbeddedKafka?

I have seen several examples using @EmbeddedKafka with simple consumers using @KafkaListener but not any that uses KStream.

I tried attempting to test the following topology to transform from incoming text stream from lowercase to uppercase.

Here's the topology:

    public class UppercaseStream {
        private static final String LOWERCASE_TOPIC = "";
        private static final String UPPERCASE_TOPIC = "";
        public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {
            KStream<String, String> sourceStream = builder
                    .stream(LOWERCASE_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
            sourceStream.print(Printed.<String, String>toSysOut().withLabel("Original KStream..."));
            KStream<String, String> upperCaseStream = sourceStream.mapValues(text -> text.toUpperCase());
            upperCaseStream.print(Printed.<String, String>toSysOut().withLabel("Uppercase KStream..."));
            return upperCaseStream;

The unit test that tests the topology is:

    public class UpperCaseTopologyTest {
        TopologyTestDriver testDriver;
        void tearDown(){
        @DisplayName("should transform lowercase to uppercase words")
        void shouldTransformLowercaseWords() {
            StreamsBuilder builder = new StreamsBuilder();
            new UppercaseStream().kStreamPromoToUppercase(builder);
            Topology topology =;
            // setup test driver
            Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
            //Create a Topology Test Driver 
            testDriver = new TopologyTestDriver(topology, props);
            TestInputTopic<String, String> inputTopic = testDriver.createInputTopic("", new Serdes.StringSerde().serializer(), new Serdes.StringSerde().serializer());
            TestOutputTopic<String, String> outputTopic = testDriver.createOutputTopic("", new Serdes.StringSerde().deserializer(), new Serdes.StringSerde().deserializer());

I want to write an integration test that first launches an embedded kafka server and then test the UppercaseStream topology.

I tried the following:

    @EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
    class EmbeddedKafkaIntegrationTest {
        public KafkaTemplate<String, String> template;
        private KafkaConsumer consumer;
        private KafkaStreams kafkaStreams;
        private String topic;
        private KafkaStreamsConfiguration kafkaStreamsConfiguration;
        public void should_transform_lowercase_to_uppercase() throws Exception {
            //Create a StreamsBuilder
            StreamsBuilder streamsBuilder = new StreamsBuilder();
  , Consumed.with(new Serdes.StringSerde(), new Serdes.StringSerde()));
            //Add a topology
            new UppercaseStream().kStreamPromoToUppercase(streamsBuilder);
            kafkaStreams = new KafkaStreams(, kafkaStreamsConfiguration.asProperties());
            template.send(topic, "test");
            consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
            assertThat(consumer.getLatch().getCount(), equalTo(0L));
            assertThat(consumer.getPayload(), containsString("TEST"));
        public void tearDown() {
            if (kafkaStreams!= null) kafkaStreams.close();

The test fails the assertion. I am not sure how to get kStreamPromoToUppercase bean. I am not sure if I am trying following the correct approach.


  • There were a few things missing from the integration test.

    A couple NewTopic kafka client admin objects were needed to represent an input and an output topics

    @Bean public NewTopic createInputTopic() { return new NewTopic(inputTopic,Optional.of(1), Optional.empty()); } The other one is for the output topic

    @Bean public NewTopic createOutputTopic() { return new NewTopic(outputTopic,Optional.of(1), Optional.empty()); }

    The rest of the test remains more of less the same. As suggested by @Garry I used the kafka consumer.

        @EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
        class KStreamSampleApplicationTests {
            private final KafkaProperties kafkaProperties;
            private final String inputTopic;
            private final String outputTopic;
            public KStreamSampleApplicationTests(KafkaProperties kafkaProperties, Environment env) {
                this.kafkaProperties = kafkaProperties;
                this.inputTopic = env.getProperty("spring.kafka.input-lowercase-topic");
                this.outputTopic = env.getProperty("spring.kafka.output-uppercase-topic");
            @DisplayName("should test uppercaseStream topology")        
            void shouldTestUppercaseStreamTopology() {
                Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(
                        String.join(",", kafkaProperties.getBootstrapServers())));
                //Create a kafka producer
                Producer<String, String> producer = new DefaultKafkaProducerFactory<>(producerProps, new StringSerializer(), new StringSerializer()).createProducer();
                Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(String.join(",", kafkaProperties.getBootstrapServers()), "testGroup", "true");
                consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
                //Create a Consumer client
                Consumer<String, String> consumer = new DefaultKafkaConsumerFactory<>(consumerProps, new StringDeserializer(), new StringDeserializer()).createConsumer();
                producer.send(new ProducerRecord<>(inputTopic, "test"));
                ConsumerRecords<String, String> rec = consumer.poll(Duration.ofSeconds(3));
                Iterable<ConsumerRecord<String, String>> records = rec.records(outputTopic);
                Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
                if (!iterator.hasNext());
                ConsumerRecord<String, String> next =;

    Here's the gist of the full refactored solution.