Search code examples
spring-integrationspring-kafka

Using EmbeddedKafkaBroker with spring integration and spring kafka


I want to use EmbeddedKafkaBroker to test my flow that involves KafkaMessageDrivenChannelAdapter,
it looks like consumer starts correclty , subscribed to topic but handler is not triggered after pushing message to EmbeddedKafkaBroker.

@SpringBootTest(properties = {"...."},  classes = {....class})
@EmbeddedKafka
class IntTests {
 @BeforeAll
 static void setup() {
    embeddedKafka = new EmbeddedKafkaBroker(1, true, TOPIC);
    embeddedKafka.kafkaPorts(57412);
    embeddedKafka.afterPropertiesSet();
 }

 @Test
 void testit() throws InterruptedException {
    String ip = embeddedKafka.getBrokersAsString();
    Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafka));
    Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();
        // Act
    producer.send(new ProducerRecord<>(TOPIC, "key", "{\"name\":\"Test\"}"));
    producer.flush();

    ....

}
...
}

And the main class:

    @Configuration
    public class Kafka {

   
    
     @Bean
     public KafkaMessageDrivenChannelAdapter<String, String> adapter(KafkaMessageListenerContainer<String, String> container) {
        KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =..
        kafkaMessageDrivenChannelAdapter.setOutputChannelName("kafkaChannel");

     }
    
     @Bean
     public KafkaMessageListenerContainer<String, String> container() {
        ContainerProperties properties = new ContainerProperties(TOPIC);        
        KafkaMessageListenerContainer<String, String> kafkaContainer = ...;
        return kafkaContainer;
     }
    
    
     @Bean
     public ConsumerFactory<String, String> consumerFactory() {
       Map<String, Object> props = new HashMap<>();
       props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:57412");
       props.put(ConsumerConfig.GROUP_ID_CONFIG, "group12");
    
       ...
           
       return new DefaultKafkaConsumerFactory<>(props); 
     }
    

    @Bean
    public PublishSubscribeChannel kafkaChannel() {
        return new PublishSubscribeChannel ();
    }

   @Bean
    @ServiceActivator(inputChannel = "kafkaChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                
            }
        };
    }
    ...
    
    }

in log I do see:

clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-group12-1, groupId=group12] Subscribed to topic(s): TOPIC
ThreadPoolTaskScheduler          : Initializing ExecutorService
KafkaMessageDrivenChannelAdapter : started bean 'adapter'; defined in: 'com.example.demo.demo.Kafka';

Solution

  • Having embeddedKafka = new EmbeddedKafkaBroker(1, true, TOPIC); and @EmbeddedKafka, you essentially start two separate Kafka clusters. See ports option of the @EmbeddedKafka if you want to change a random port for embedded broker. But at the same time it is better to rely in what Spring Boot provides for us with its auto-configuration.

    See documentation for more info: https://docs.spring.io/spring-boot/docs/current/reference/html/spring-boot-features.html#boot-features-embedded-kafka. Pay attention to the bootstrapServersProperty = "spring.kafka.bootstrap-servers" property.

    UPDATE

    In your test you have this @SpringBootTest(classes = {Kafka.class}). When I remove that classes attribute, everything has started to work. The problem that your config class is not auto-configuration aware, therefore you don't have Spring Integration initialized properly and the message is not consumed from the channel. There might be some other affect. But still: better to rely on the auto-configuration, so let your test to see that @SpringBootApplication annotation.