I have a typical samza task which consumes 2 topics: data
and config
, and stores messages from config
as local state in rocksdb to check if messages from data
are OK.
This task works fine if each of these two topics has only one partition. Once I split data
into ten partitions and config
remains one partition, things changed. By default, samza creates ten tasks to consume partition 0 ~ 9 of data
topic and only task 0 consumes config
topic:
task[0] -> config, data[0]
task[1] -> data[1]
...
task[9] -> data[9]
It seems each task is initialized with its own rocksdb instance, so only task[0] stores all the config data in its rocksdb instance, task[1~9] has no config data and hence unable to find config information of the incoming data.
What I expected is each task consumes messages from its data partition and the config stream like this:
task[0] -> config, data[0]
task[1] -> config, data[1]
...
task[9] -> config, data[9]
Is there any way to achieve this?
The distribution of input stream(s)'s partition is governed by a pluggable grouper that is configured using "job.systemstreampartition.grouper.factor". By default, This class groups the incoming stream partitions across tasks instances. By default, I believe it does a GroupByPartitionId. That is why you are seeing data[0] and config[0] in task[0].
You can implement a custom SSPGrouper. However, what you are looking for is to treat "data" stream as a regular input stream and "config" stream as a "broadcast" input stream. Broadcast means every task in the Samza job reads from this stream's partitions. This way, each task instance can populate its local rocksdb with config streams' data. You can configure broadcast streams as:
task.broadcast.inputs=<systemName>.<streamName>#[<partition-range>], <systemName>.<streamName>#[<partition-range>]
For your case, you can configure:
task.inputs = <systemName>.data
task.broadcast.inputs = <systemName>.config#0
Check out Broadcast Streams in Samza