I use pykafka group consumer with gevent, but the results have repeating data. show my code:
import gevent
from pykafka import KafkaClient
topic_name = 'test2'
bootstrap_servers = '192.168.199.228:9094,192.168.199.228:9092,192.168.199.228:9093'
group = 'test_g'
def get_consumer():
client = KafkaClient(hosts=bootstrap_servers, use_greenlets=True)
topic = client.topics[topic_name.encode()]
consumer = topic.get_simple_consumer(auto_commit_interval_ms=10000,
consumer_group=group.encode(),
auto_commit_enable=True,
)
return consumer
def worker(worker_id):
consumer = get_consumer()
for msg in consumer:
print('worker {} partition: {}, offset: {}'.format(worker_id, msg.partition, msg.offset))
if __name__ == '__main__':
tasks = [gevent.spawn(worker, *(i, )) for i in range(3)]
ret = gevent.joinall(tasks)
reulst: Anyone can tell me how to make it work, does pykafka not support gevent?
I'm betting that this issue doesn't have anything to do with your use of gevent. The reason you're noticing duplicated data across consumers is that you're using a SimpleConsumer
instead of a BalancedConsumer
. SimpleConsumer
does not perform automatic balancing - it simply consumes the entire topic from its starting offset. Thus if you have many SimpleConsumer
instances running side by side as you do here, each one will consume the entire topic from its starting offset. BalancedConsumer
(topic.get_balanced_consumer(consumer_group='mygroup')
) is probably what you want here. It uses a consumer rebalancing algorithm to ensure that consumers running in the same group don't receive the same messages. For this to work, your topic needs to have at least as many partitions as processes you have consuming it. See the pykafka README and documentation for more information.