I'm having a difficulty understanding how to configure the following.
Since spring-kafka:2.8.4 thee KafkaListener
interface can be configured with a filter which will be applied to all incoming messages, the Javadoc for the filter
method:
/**
* Set an {@link org.springframework.kafka.listener.adapter.RecordFilterStrategy} bean
* name to override the strategy configured on the container factory. If a SpEL
* expression is provided ({@code #{...}}), the expression can either evaluate to a
* {@link org.springframework.kafka.listener.adapter.RecordFilterStrategy} instance or
* a bean name.
* @return the error handler.
* @since 2.8.4
*/
String filter() default "";
RecordFilterStrategy has a single method:
/**
* Return true if the record should be discarded.
* @param consumerRecord the record.
* @return true to discard.
*/
boolean filter(ConsumerRecord<K, V> consumerRecord);
Basically I need to create a kind of a lambda, but I don't understand how to reference the consumerRecord variable, this is what I have already tried:
#{#consumerRecord.key().equals(T(com.example.kafkaconsumer.EventType).CREATE.toString())}
It fails with the exception:
Caused by: org.springframework.expression.spel.SpelEvaluationException: EL1011E:
Method call: Attempted to call method key() on null context object
This is what I'm trying to implement using SPEL:
@Bean
public RecordFilterStrategy<String, Foo> recordFilterStrategy() {
return rec -> !Objects.equals(rec.key(), EventType.CREATE.toString());
}
See that JavaDocs one more time:
Set an {@link org.springframework.kafka.listener.adapter.RecordFilterStrategy} bean
* name
Since you already have a recordFilterStrategy
bean, so that's enough for your to use in that filter()
attribute:
filter = "recordFilterStrategy"
No need to fight with a complex SpEL.