Search code examples
regexapache-kafkakafka-consumer-apispring-kafka

What should be the best way to filter the kafka message


I'm consuming data from a kafka topic which includes the area code. I have to filter the data only for certain area codes. Can any one suggest be the best approach to solve this.

Here is my listener code looks like. Is it best practice to parse the data into object(as I mapped the payload to a TEST object) and filter the data based on the value which I need to filter or Does kafka provides any other libraries which I can make use of this filtering process.

Kafka Listener Method

@Service
public class Listener{

    @KafkaListener(topics = "#{@topicName}")
        public void listen(String payload) throws IOException {

            LOGGER.info("received payload from topic='{}'", payload);
            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

            TEST test = objectMapper.readValue(payload,TEST.class);

        }
}

My Kafka Configuration class:

@Configuration
public class Config {


    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, applicationConfiguration.getKafkaBootStrap());
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, applicationConfiguration.getKafkaKeyDeserializer());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, applicationConfiguration.getKafkaValueDeserializer());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, applicationConfiguration.getKafkaGroupId());
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, applicationConfiguration.getKafkaAutoOffsetReset());
        return properties;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }
    @Bean
    public Listener receiver() {
        return new Listener();
    }

}

Solution

  • See Filtering Messages.

    The Spring for Apache Kafka project also provides some assistance by means of the FilteringMessageListenerAdapter class, which can wrap your MessageListener. This class takes an implementation of RecordFilterStrategy in which you implement the filter method to signal that a message is a duplicate and should be discarded. This has an additional property called ackDiscarded, which indicates whether the adapter should acknowledge the discarded record. It is false by default.

    When you use @KafkaListener, set the RecordFilterStrategy (and optionally ackDiscarded) on the container factory so that the listener is wrapped in the appropriate filtering adapter.

    /**
     * Set the record filter strategy.
     * @param recordFilterStrategy the strategy.
     */
    public void setRecordFilterStrategy(RecordFilterStrategy<? super K, ? super V> recordFilterStrategy) {
        this.recordFilterStrategy = recordFilterStrategy;
    }
    
    /**
     * Implementations of this interface can signal that a record about
     * to be delivered to a message listener should be discarded instead
     * of being delivered.
     *
     * @param <K> the key type.
     * @param <V> the value type.
     *
     * @author Gary Russell
     *
     */
    public interface RecordFilterStrategy<K, V> {
    
        /**
         * Return true if the record should be discarded.
         * @param consumerRecord the record.
         * @return true to discard.
         */
        boolean filter(ConsumerRecord<K, V> consumerRecord);
    
    }