I'm newbie in kafka streams. I need to create kafka streams dynamically from config files, which contain source and destination topic names. Is it possible to restart and stop Kafka streams? My goal is transferring messages from one topic to another periodically using kafka streams. I used spring cron job and tried closing and opening stream but I can't start it again when I close a stream. I got this error --> The client is either already started or already stopped, cannot re-start. I'm writing the code in java
+--------------+
+<----- | Created (0) |
| +-----+--------+
| |
| v
| +----+--+------+
| | Re- |
+<----- | Balancing (1)| -------->+
| +-----+-+------+ |
| | ^ |
| v | |
| +--------------+ v
| | Running (2) | -------->+
| +------+-------+ |
| | |
| v |
| +------+-------+ +----+-------+
+-----> | Pending |<--- | Error (5) |
| Shutdown (3) | +------------+
+------+-------+
|
v
+------+-------+
| Not |
| Running (4) |
+--------------+
There are two methods pause
and resume
on the KafkaStreams
class that you can use to pause and resume processing.
https://kafka.apache.org/33/javadoc/org/apache/kafka/streams/KafkaStreams.html#pause-- https://kafka.apache.org/33/javadoc/org/apache/kafka/streams/KafkaStreams.html#resume--
You could use the scheduleAtFixedRate
method of the java.util.concurrent.ScheduledExecutorService
to schedule pausing and resuming every 5 minutes.