I'm trying to consume a kafka topic in python and serve over http using the prometheus client, but I seem to be blocked on the topic consumption. I put some place holders to simply add metrics, but it looks like that part is being blocked.
import os
from pykafka import KafkaClient
import threading
from kafka import KafkaConsumer
from prometheus_client import start_http_server, Metric, REGISTRY
class CustomCollector(threading.Thread):
daemon = True
def collect(self):
client = KafkaClient(hosts=os.environ['KAFKA_ADDRESS'])
topic = client.topics[b'os.environ['KAFKA_TOPIC']
consumer = topic.get_simple_consumer()
for message in consumer:
if message is not None:
print(message.value)
metric = Metric('test_name', 'description', 'summary')
metric.add_sample('test_name', 'description', 'summary')
yield metric
if __name__ == '__main__':
start_http_server(9998)
REGISTRY.register(CustomCollector())
while True: time.sleep(1)
If I run the code, I see the topic data being streamed to console as expected. However, my metric endpoint is never populated and any request to the web server just hangs until I kill the app, to which it responds with the standard metrics from the library.
The construction of a Metric
instance should happen once per consumed message. That is, the Metric()
call should be inside of the for message in consumer
loop. Also, you probably want to use message.value
in some way when creating the Metric
instance.