Search code examples
apache-kafka-streams

KafkaStreams processor partition re-assignment behavior


Suppose I have a basic KafkaStreams application with one topic (having multiple partitions) and one processor type handling the messages as follows:

    builder.stream(topic)
           .process(() -> new MyProcessor());

Can the following scenario occur or not? For a particular instance of MyProcessor, say M (i.e. the particular java object obtained through invocation of the processor supplier), and for a particular partition of the topic, say P,

  1. at some time t1, M receives messages from P
  2. at a later point t2, P is revoked from M so M does not receive messages from P anymore (e.g. because an extra worker was started which handles P)
  3. at a later point, t3, M receives messages again from P

I checked the documentation on how stream tasks relate to Kafka topic partitions but I did not find detailed information on how this relates to the construction and deletion of processor instances and/or (un-)assigment of topic partitions to existing processors when rebalancing occurs.


Solution

  • In Kafka Streams, the "unit of processing" is called a stream task.

    Tasks can be stateful and/or stateless. When a rebalancing event happens, a task running on one instance (say, M) of your application may be moved to another instance of your application.

    There's a 1-1 mapping between topic partitions and stream tasks, which guarantees that one and only one task will process data from a particular partition. For example, if task 3 is responsible to read from and process in partition P, then when task 3 is moved from instance M to another instance M', then M will stop reading P (because it no longer runs the task 3), and M' (where task 3 runs now) will resume/continue to process P.

    1. at some time t1, M receives messages from P

    Let's say the stream task responsible for processing the topic partition P is called task(P). At time t1, M happens to be the app instance that is running task(P). This is the situation at point #1 above.

    1. at a later point t2, P is revoked from M so M does not receive messages from P anymore (e.g. because an extra worker was started which handles P)

    Here, another instance of the application (you referred to this instance as "extra worker") becomes responsible for running task(P). Here, task(P) will be migrated automatically from the original app instance M to the new instance M'. Any state that was managed by task(P) (e.g., when the task was doing a stateful operation such as a join or an aggregation) will of course be migrated together with the task. When task(P) is being migrated, also the responsibility for reading from and processing the topic partition P will move from app instance M to M'.

    Perhaps don't think too much in terms of "which app instance is handling topic partition P?" Rather, a particular partition is always handled by a particular stream task, and stream tasks may move across app instances. (Of course, Kafka's Streams API will prevent unnecessary task migrations to ensure the processing of your application stays efficient.)

    1. at a later point, t3, M receives messages again from P

    This means that, at time t3, M has been assigned the task task(P) again as a result of another rebalancing event -- perhaps because the other app instance M' was taken down, or something else happened that required task migration.

    Asked in the comments to this answer: It would also be useful though to have a sentence or two about the state migration. It's not like the binary/physical data is taken from one RocksDB instance and passed over to another. Obviously, the state is re-built based on fault-tolerance mechanism.

    Stateful tasks use state stores to, well, persistent state information. These state stores are fault-tolerant. The source of truth for these state stores is Kafka itself: any changes to a state store (e.g., incrementing a counter) are backed up to Kafka in a streaming manner—similar to storing the CDC stream of a database table into Kafka topics (these are normal topics but often called 'changelog topics'). Then, when a task dies or is migrated to another container/VM/machine, the task's state store(s) are restored in the task's new container/VM/machine by reading from Kafka (think: streaming-backup / streaming-restore). This restores the state store(s) to exactly how they looked like on the original container, without any data loss or duplication.

    A stream task uses RocksDB to materialize a state store locally (like in the task's container) for optimization purposes. Think of these local RocksDB instances as caches that are ok to be lost with regards to data safety, because the durable storage of the state data is Kafka, as described above.