Search code examples
javaspringapache-kafkaspring-integration

RecordFilterStrategy with int-kafka:message-driven-channel-adapter


Is there a way to inject a listener with custom RecordFilterStategy?

<int-kafka:message-driven-channel-adapter
        id="kafkaConsumer"
        listener-container="listenerContainer"
        channel="consumerChannel"
        message-converter="messageConverter"
        payload-type="java.lang.String"
/>

I've tried to do the following:

<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
    <constructor-arg name="topics">
        <list>
            <value>someTopic</value>
        </list>
    </constructor-arg>
    <property name="errorHandler" ref="listenerErrorHandler"/>
    <property name="messageListener" ref="filteringMessageListener"/>
</bean>

<bean id="recordFilterStrategy" class="com.some.path"/>

<bean id="filteringMessageListener" class="org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter">
    <constructor-arg>
        <bean factory-bean="containerProperties" factory-method="getMessageListener"/>
    </constructor-arg>
    <constructor-arg ref="recordFilterStrategy"/>
</bean>

But I got en error: Constructor threw exception; nested exception is java.lang.IllegalArgumentException: Container must not already have a listener


Solution

  • It is a bug in the XML configuration.

    The KafkaMessageDrivenChannelAdapter has a respective property:

    /**
     * Specify a {@link RecordFilterStrategy} to wrap
     * {@link KafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListener} into
     * {@link FilteringMessageListenerAdapter}.
     * @param recordFilterStrategy the {@link RecordFilterStrategy} to use.
     */
    public void setRecordFilterStrategy(RecordFilterStrategy<K, V> recordFilterStrategy) {
    

    We just missed to expose it for the XML configuration.

    Feel free to raise a GH issue on the matter!

    As a workaround consider to declare a regular <bean> for the KafkaMessageDrivenChannelAdapter instead of that XML config. Or just move away from XML config altogether in favor of Java DSL: https://docs.spring.io/spring-integration/docs/current/reference/html/kafka.html#kafka-inbound-adapter-configuration