Search code examples
apache-kafkaspring-integrationspring-kafka

Spring Integration and Kafka: How to filter messages based on message header


I have a question which builds on this question: Filter messages before deserialization based on headers

I'd like to filter by kafka consumer record header using Spring Integration DSL.

Currently I have this flow:

@Bean
IntegrationFlow readTicketsFlow(KafkaProperties kafkaProperties,
                                ObjectMapper jacksonObjectMapper,
                                EventService<Ticket> service) {
    Map<String, Object> consumerProperties = kafkaProperties.buildConsumerProperties();
    DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties);

    return IntegrationFlows.from(
            Kafka.messageDrivenChannelAdapter(
                    consumerFactory, TICKET_TOPIC))
            .transform(fromJson(Ticket.class, new Jackson2JsonObjectMapper(jacksonObjectMapper)))
            .handle(service)
            .get();
}

How can I register org.springframework.kafka.listener.adapter.RecordFilterStrategy in this flow?


Solution

  • You can simply add a .filter() element to the flow.

    .filter("!'bar'.equals(headers['foo'])")
    

    Will filter out (ignore) any messages with a header named foo equal to bar.

    Note Spring Kafka's RecordFilterStrategy has the reverse sense of Spring Integration filters

    public interface RecordFilterStrategy<K, V> {
    
        /**
         * Return true if the record should be discarded.
         * @param consumerRecord the record.
         * @return true to discard.
         */
        boolean filter(ConsumerRecord<K, V> consumerRecord);
    
    }
    

    Spring Integration filters discard messages if the filter returns false.

    EDIT

    Or you can add a RecordFilterStrategy to the channel adapter.

    return IntegrationFlows
            .from(Kafka.messageDrivenChannelAdapter(consumerFactory(), TEST_TOPIC1)
                    .recordFilterStrategy(record -> {
                        Header header = record.headers().lastHeader("foo");
                        return header != null ? new String(header.value()).equals("bar") : false;
                    })
                    ...