I have a question which builds on this question: Filter messages before deserialization based on headers
I'd like to filter by kafka consumer record header using Spring Integration DSL.
Currently I have this flow:
@Bean
IntegrationFlow readTicketsFlow(KafkaProperties kafkaProperties,
ObjectMapper jacksonObjectMapper,
EventService<Ticket> service) {
Map<String, Object> consumerProperties = kafkaProperties.buildConsumerProperties();
DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties);
return IntegrationFlows.from(
Kafka.messageDrivenChannelAdapter(
consumerFactory, TICKET_TOPIC))
.transform(fromJson(Ticket.class, new Jackson2JsonObjectMapper(jacksonObjectMapper)))
.handle(service)
.get();
}
How can I register org.springframework.kafka.listener.adapter.RecordFilterStrategy
in this flow?
You can simply add a .filter()
element to the flow.
.filter("!'bar'.equals(headers['foo'])")
Will filter out (ignore) any messages with a header named foo
equal to bar
.
Note Spring Kafka's RecordFilterStrategy
has the reverse sense of Spring Integration filters
public interface RecordFilterStrategy<K, V> {
/**
* Return true if the record should be discarded.
* @param consumerRecord the record.
* @return true to discard.
*/
boolean filter(ConsumerRecord<K, V> consumerRecord);
}
Spring Integration filters discard messages if the filter returns false.
EDIT
Or you can add a RecordFilterStrategy
to the channel adapter.
return IntegrationFlows
.from(Kafka.messageDrivenChannelAdapter(consumerFactory(), TEST_TOPIC1)
.recordFilterStrategy(record -> {
Header header = record.headers().lastHeader("foo");
return header != null ? new String(header.value()).equals("bar") : false;
})
...