Search code examples
javaapache-kafkaapache-kafka-streamsspring-kafka

How can I pause and resume stream processing periodically(every 5 minutes) using Kafka Streams and Spring Kafka Streams?


I'm newbie in kafka streams. I need to create kafka streams dynamically from config files, which contain source and destination topic names. Is it possible to restart and stop Kafka streams? My goal is transferring messages from one topic to another periodically using kafka streams. I used spring cron job and tried closing and opening stream but I can't start it again when I close a stream. I got this error --> The client is either already started or already stopped, cannot re-start. I'm writing the code in java

         +--------------+
               +<----- | Created (0)  |
               |       +-----+--------+
               |             |
               |             v
               |       +----+--+------+
               |       | Re-          |
               +<----- | Balancing (1)| -------->+
               |       +-----+-+------+          |
               |             | ^                 |
               |             v |                 |
               |       +--------------+          v
               |       | Running (2)  | -------->+
               |       +------+-------+          |
               |              |                  |
               |              v                  |
               |       +------+-------+     +----+-------+
               +-----> | Pending      |<--- | Error (5)  |
                       | Shutdown (3) |     +------------+
                       +------+-------+
                              |
                              v
                       +------+-------+
                       | Not          |
                       | Running (4)  |
                       +--------------+

Solution

  • There are two methods pause and resume on the KafkaStreams class that you can use to pause and resume processing.

    https://kafka.apache.org/33/javadoc/org/apache/kafka/streams/KafkaStreams.html#pause-- https://kafka.apache.org/33/javadoc/org/apache/kafka/streams/KafkaStreams.html#resume--

    You could use the scheduleAtFixedRate method of the java.util.concurrent.ScheduledExecutorService to schedule pausing and resuming every 5 minutes.