Search code examples
pythonpython-3.xapache-kafkageventpykafka

How to use pykafka group consumer with gevent?


I use pykafka group consumer with gevent, but the results have repeating data. show my code:

import gevent
from pykafka import KafkaClient

topic_name = 'test2'
bootstrap_servers = '192.168.199.228:9094,192.168.199.228:9092,192.168.199.228:9093'
group = 'test_g'


def get_consumer():
    client = KafkaClient(hosts=bootstrap_servers, use_greenlets=True)
    topic = client.topics[topic_name.encode()]

    consumer = topic.get_simple_consumer(auto_commit_interval_ms=10000,
                                     consumer_group=group.encode(),
                                     auto_commit_enable=True,
                                     )
    return consumer


def worker(worker_id):
    consumer = get_consumer()
    for msg in consumer:
        print('worker {} partition: {}, offset: {}'.format(worker_id, msg.partition, msg.offset))


if __name__ == '__main__':
    tasks = [gevent.spawn(worker, *(i, )) for i in range(3)]
    ret = gevent.joinall(tasks)

reulst: Anyone can tell me how to make it work, does pykafka not support gevent?


Solution

  • I'm betting that this issue doesn't have anything to do with your use of gevent. The reason you're noticing duplicated data across consumers is that you're using a SimpleConsumer instead of a BalancedConsumer. SimpleConsumer does not perform automatic balancing - it simply consumes the entire topic from its starting offset. Thus if you have many SimpleConsumer instances running side by side as you do here, each one will consume the entire topic from its starting offset. BalancedConsumer (topic.get_balanced_consumer(consumer_group='mygroup')) is probably what you want here. It uses a consumer rebalancing algorithm to ensure that consumers running in the same group don't receive the same messages. For this to work, your topic needs to have at least as many partitions as processes you have consuming it. See the pykafka README and documentation for more information.