Search code examples
pythonapache-kafkaprometheuspykafka

How to consume kafka topic and parse/serve over http?


I'm trying to consume a kafka topic in python and serve over http using the prometheus client, but I seem to be blocked on the topic consumption. I put some place holders to simply add metrics, but it looks like that part is being blocked.

import os
from pykafka import KafkaClient
import threading
from kafka import KafkaConsumer
from prometheus_client import start_http_server, Metric, REGISTRY

class CustomCollector(threading.Thread):
    daemon = True

    def collect(self):
        client = KafkaClient(hosts=os.environ['KAFKA_ADDRESS'])
        topic = client.topics[b'os.environ['KAFKA_TOPIC']
        consumer = topic.get_simple_consumer()
        for message in consumer:
            if message is not None:
                print(message.value)

        metric = Metric('test_name', 'description', 'summary')
        metric.add_sample('test_name', 'description', 'summary')
        yield metric

if __name__ == '__main__':
    start_http_server(9998)
    REGISTRY.register(CustomCollector())
    while True: time.sleep(1)

If I run the code, I see the topic data being streamed to console as expected. However, my metric endpoint is never populated and any request to the web server just hangs until I kill the app, to which it responds with the standard metrics from the library.


Solution

  • The construction of a Metric instance should happen once per consumed message. That is, the Metric() call should be inside of the for message in consumer loop. Also, you probably want to use message.value in some way when creating the Metric instance.