Search code examples
spring-bootspring-integrationspring-kafka

Is there any xml configuration available for inbound channel adapter in spring integration kafka version 3.x


We are using Spring integration 5.1.4 and spring-boot-starter-integration 2.1.4 in our application. We use XML configuration for convenience to see integration graph. Now, We need to read the messages from kafka topic so we want to use latest spring-integration-kafka 3.1.2.RELEASE version and kafka inbound channel adapter. I could find sample xml configurations using spring-integration-kafka 1.x versions but unable to find xml configuration for recent versions ? If I use older xml configuration with version 3.x, it is throwing error "no declaration can be found for element int-kafka:zookeeper-connect".Can anyone help us to point us what's wrong with the version compatibility matrix or provide some sample xml configuration for 3.1.2 kafka inbound channel adapter to read from kafka topic.

<int-kafka:zookeeper-connect
    id="zookeeperConnect" zk-connect="localhost:2181"
    zk-connection-timeout="6000" zk-session-timeout="6000"
    zk-sync-time="2000" />

<int-kafka:inbound-channel-adapter
    id="kafkaInboundChannelAdapter"
    kafka-consumer-context-ref="consumerContext" auto-startup="true"
    channel="inputFromKafka">
    <int:poller fixed-delay="2000" time-unit="MILLISECONDS" />
</int-kafka:inbound-channel-adapter>

<bean id="consumerProperties"
    class="org.springframework.beans.factory.config.PropertiesFactoryBean">
    <property name="properties">
        <props>
            <prop key="auto.offset.reset">smallest</prop>
            <prop key="socket.receive.buffer.bytes">10485760</prop> <!-- 10M -->
            <prop key="fetch.message.max.bytes">5242880</prop>
            <prop key="auto.commit.interval.ms">1000</prop>
        </props>
    </property>
</bean>

<int-kafka:consumer-context
    id="consumerContext" consumer-timeout="1000"
    zookeeper-connect="zookeeperConnect"
    consumer-properties="consumerProperties">
    <int-kafka:consumer-configurations>
        <int-kafka:consumer-configuration
            group-id="Group1" max-messages="5000"
            key-decoder="deccoder" value-decoder="deccoder">
            <int-kafka:topic id="Helloworld-Topic" streams="3" />
        </int-kafka:consumer-configuration>
    </int-kafka:consumer-configurations>
</int-kafka:consumer-context>

<bean id="deccoder"
    class="org.springframework.integration.kafka.serializer.common.StringDecoder" />

Solution

  • See the documentation (a chapter in the Spring for Apache Kafka reference).

    <int-kafka:message-driven-channel-adapter
            id="kafkaListener"
            listener-container="container1"
            auto-startup="false"
            phase="100"
            send-timeout="5000"
            mode="record"
            retry-template="template"
            recovery-callback="callback"
            error-message-strategy="ems"
            channel="someChannel"
            error-channel="errorChannel" />
    
    <bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
        <constructor-arg>
            <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
                <constructor-arg>
                    <map>
                    <entry key="bootstrap.servers" value="localhost:9092" />
                    ...
                    </map>
                </constructor-arg>
            </bean>
        </constructor-arg>
        <constructor-arg>
            <bean class="org.springframework.kafka.listener.config.ContainerProperties">
                <constructor-arg name="topics" value="foo" />
            </bean>
        </constructor-arg>
    
    </bean>