Search code examples
pythonpython-3.xapache-kafkakafka-consumer-api

What is the best practice for keeping Kafka consumer alive in python?


Something is puzzling for me when it comes to keeping consumers alive. Let's say I have a topic to which data is constantly being written. But, in an hour in a day, there are no new messages. If I had set a timeout for my consumers, when there are no new messages, the consumer will get closed.

Now, new messages arrive. But, there are not consumers alive to consume them.

How should I handle such scenarios? My consumers may consume all messages and get closed. What is the best way to keep them alive? Is there any way to invoke them automatically upon the arrival of new messages? What are the best practices for such scenarios?


Solution

  • Why not just

    import time
    from confluent_kafka import Consumer
    
    
    consumer = Consumer({
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'my-consumer-1',
        'auto.offset.reset': 'earliest'
    })
    consumer.subscribe(['topicName'])
    
    while True:
        try: 
            message = consumer.poll(10.0)
    
            if not message:
                time.sleep(120) # Sleep for 2 minutes
    
            if message.error():
                print(f"Consumer error: {message.error()}")
                continue
    
            print(f"Received message: {msg.value().decode('utf-8')}")
        except:
            # Handle any exception here
            ...
        finally:
            consumer.close()
            print("Goodbye")
    

    I cannot comment on the requirement of "setting a timeout for consumers", but in most of the cases consumers are supposed to run "forever" and should also be added to consumer groups in a way that they are highly available.