I have been working with Kafka lately and have bit of confusion regarding the consumers under a consumer group. The center of the confusion is whether to implement consumers as processes or threads. For this question, assume I am using the high level consumer.
Let's consider a scenario that I have experimented with. In my topic there are 2 partitions (for simplicity let's assume replication factor is just 1). I created a consumer (ConsumerConnector
) process consumer1
with group group1
, then created a topic count map of size 2 and then spawned 2 consumer threads consumer1_thread1
and consumer1_thread2
under that process. It looks like consumer1_thread1
is consuming partition 0
and consumer1_thread2
is consuming partition 1
. Is this behaviour always deterministic? Below is the code snippet. Class TestConsumer
is my consumer thread class.
...
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(2));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(2);
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new TestConsumer(stream, threadNumber));
threadNumber++;
}
...
Now, let's consider another scenario (which I haven't experimented but am curious) where I start 2 consumer processes consumer1
and consumer2
both having the same group group1
and each of them is a single threaded process. Now my questions are:
How will the two independent consumer processes (under the same group nevertheless) be related to the partitions in this case ? How is it different from the above single process multi-thread scenario?
In general, how are consumer threads or processes mapped / related to partitions in the topic?
The Kafka documentation does say that each consumer under a consumer group will consume one partition. However, does that refer to a consumer thread (like my above code example) or independent consumer processes?
Is there any subtle thing I am missing here regarding implementing consumers as processes vs threads? Thanks in advance.
A consumer group can have multiple consumer instances running (multiple process with the same group-id
). While consuming each partition is consumed by exactly one consumer instance in the group.
E.g. if your topic contains 2 partitions and you start a consumer group group-A
with 2 consumer instances then each one of them will be consuming messages from a particular partition of the topic.
If you start the same 2 consumer with different group id group-A
& group-B
then the message from both partitions of the topic will be broadcast to each one of them. So in that case the consumer instance running under group-A
will have messages from both the partitions of the topic, and same is true for group-B
as well.
Read more on this on their documentation
EDIT : Based on your comment which says,
I was wondering what is the effective difference between having 2 consumer threads under the same process as opposed to 2 consumer processes (group being the same in both cases)
The consumer group-id
is same/global across the cluster. Suppose you have started process-one with 2 threads and then spawn another process (may be in a different machine) with the same groupId having 2 more threads then kafka will add these 2 new threads to consume messages from the topic. So eventually there will be 4 threads responsible for consuming from the same topic. Kafka will then trigger a re-balance to re-assign partitions to threads, so it could happen that for a particular partition which was being consumed by thread T1 of process P1
may be allocated to be consumed by thread T2 of process P2
. The below few lines are taken from the wiki page
When a new process is started with the same Consumer Group name, Kafka will add that processes' threads to the set of threads available to consume the Topic and trigger a 're-balance'. During this re-balance Kafka will assign available partitions to available threads, possibly moving a partition to another process. If you have a mixture of old and new business logic, it is possible that some messages go to the old logic.