Search code examples
javaspringspring-integrationspring-xdspring-kafka

Spring Integration & Kafka Consumer: Stop message-driven-channel-adapter right after records are sucessfully fetched


I'm using the following confing:

  • spring-integration-kafka 2.1.0.RELEASE
  • kafka-clients 0.10.0.1
  • Kafka 0.10.x.x
  • spring-xd-1.3.1.RELEASE

I created my custom Kafka-source module for SpringXD. I set my consumer logic and my message-driven-channel-adapter (which I'm using in conjunction with a control-bus to stop my channel adapter). So far so good. Also I'm using as a kafka property max.poll.record=10 to fetch 10 records per poll.

I would like to make sure that I'm stopping my channel right after all records (in this case 10 records) have been successfully fetched.

So for example: I would like to AVOID Stopping reading when not all records are been successfully fetched and processed (that is, when the records are not been sent to the output channel).

Is there a way to tell that?

This is my xml config, just in case:

xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
    http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
                               http://www.springframework.org/schema/context
                       http://www.springframework.org/schema/context/spring-context.xsd">


<int:channel id="input_to_control_bus" />
<int:channel id="output" />

<context:component-scan base-package="com.kafka.source.logic" />


<int:control-bus id="my_control_bus" input-channel="input_to_control_bus" />

<int-kafka:message-driven-channel-adapter
    id="kafkaInboundChannelAdapterTesting" listener-container="container1"
    auto-startup="false" phase="100" send-timeout="5000" channel="output"
    mode="record" message-converter="messageConverter" />

<bean id="messageConverter"
    class="org.springframework.kafka.support.converter.MessagingMessageConverter" />

<!--Consumer -->
<bean id="container1"
    class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="localhost:9092" />
                    <entry key="enable.auto.commit" value="false" />
                    <entry key="auto.commit.interval.ms" value="100" />
                    <entry key="session.timeout.ms" value="15000" />
                    <entry key="max.poll.records" value="3" />
                    <entry key="group.id" value="bridge-stream-testing" />
                    <entry key="key.deserializer"
                        value="org.apache.kafka.common.serialization.IntegerDeserializer" />
                    <entry key="value.deserializer"
                        value="org.apache.kafka.common.serialization.StringDeserializer" />
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>

    <constructor-arg>
        <bean class="org.springframework.kafka.listener.config.ContainerProperties">
            <constructor-arg name="topics" value="testing-topic" />
        </bean>
    </constructor-arg>
</bean>

[UPDATE N°1] Why do I want to do this? These are the details:

  • I would like to read at most X messages every Y minutes from a Kafka topic.
  • I use max.poll.records to ensure that I'm fetching at most X messages per poll.
  • One scenario that I would like to handle is: what happens if in one specific polling of messages, I poll less messages than X. That implies that I should stop the channel without waiting for X messages, otherwise I would have to wait until a future poll of messages to reach those X messages.

Those are some details about this scenario. There are more scenarios but I don't want to mix it using the same SO question.

[UPDATE N°2]

Some thoughts after Artem's answer.

  • What happens if I don't define a max.poll.records and just wait until having reached Y minutes and having counted X messages, and then stop the channel?
  • Do some messages would be lost because could not be read, or those message that couldn't be read will be read when I start again the channel?

I want to avoid to keep messages in memory, that is the reason I was using message-driven-channel-adapter + max.poll.records


Solution

  • What I can suggest is like AtomicInteger bean, which is increased on each processed record and when you are reaching the threshold you perform the stop() for your kafkaInboundChannelAdapterTesting.