Search code examples
apache-kafkaapache-flinkkafka-consumer-apiflink-streaming

Flink Kafka: Gracefully close flink consuming messages from kafka source after a time interval when no messages are received


I have added flinkkafkaconsumer as source to my streamexecutionenvironment. I would like to close/stop flink from consuming data when no new messages are received for a specific time(similar to kafka polltime). Currently it is running indefinitely and blocking the execution from moving to next step(validating the messages). Please suggest if there is any workaround.

Note: I tried with endofstream from deserialization and it wont work since stream is practically indefinite.

Thanks in advance.


Solution

  • If this is for testing, then one approach is to create your own custom Source that "wraps" the FlinkKafkaConsumer. Your source's run() method would call the Kafka source's run() method from a thread, passing in a collector that wraps the real collector, and updates a "last collected time" whenever anything is collected. In your source's run() method you would then poll on this, and call the Kakfa source's cancel() method when too much time has elapsed, then exit as well.

    Having said all that, typically for unit testing you'd want to use a mocked source that lets you control exactly what's being generated, and when, versus spinning up a Kafka system.