Search code examples
junitapache-kafkaspring-cloudspring-kafkaspring-cloud-stream

How to create unit test with kafka embedded in the spring cloud stream


Sorry for the question being too generic, but someone has some tutorial or guide on how to perform producer and consumer testing with kafka embedded. I've tried several, but there are several versions of dependencies and none actually works =/

I'm using spring cloud stream kafka.


Solution

  • We generally recommend using the Test Binder in tests but if you want to use an embedded kafka server, it can be done...

    Add this to your POM...

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>
    

    Test app...

    @SpringBootApplication
    @EnableBinding(Processor.class)
    public class So43330544Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So43330544Application.class, args);
        }
    
        @StreamListener(Processor.INPUT)
        @SendTo(Processor.OUTPUT)
        public byte[] handle(byte[] in){
            return new String(in).toUpperCase().getBytes();
        }
    
    }
    

    application.properties...

    spring.cloud.stream.bindings.output.destination=so0544out
    spring.cloud.stream.bindings.input.destination=so0544in
    spring.cloud.stream.bindings.output.producer.headerMode=raw
    spring.cloud.stream.bindings.input.consumer.headerMode=raw
    spring.cloud.stream.bindings.input.group=so0544
    

    Test case...

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class So43330544ApplicationTests {
    
        @ClassRule
        public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1);
    
        @Autowired
        private KafkaTemplate<byte[], byte[]> template;
    
        @Autowired
        private KafkaProperties properties;
    
        @BeforeClass
        public static void setup() {
            System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
        }
    
        @Test
        public void testSendReceive() {
            template.send("so0544in", "foo".getBytes());
            Map<String, Object> configs = properties.buildConsumerProperties();
            configs.put(ConsumerConfig.GROUP_ID_CONFIG, "test0544");
            configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            ConsumerFactory<byte[], byte[]> cf = new DefaultKafkaConsumerFactory<>(configs);
            Consumer<byte[], byte[]> consumer = cf.createConsumer();
            consumer.subscribe(Collections.singleton("so0544out"));
            ConsumerRecords<byte[], byte[]> records = consumer.poll(10_000);
            consumer.commitSync();
            assertThat(records.count()).isEqualTo(1);
            assertThat(new String(records.iterator().next().value())).isEqualTo("FOO");
        }
    
    }