Search code examples
pythonapache-kafkatwistedconfluent-kafka-python

twisted integrating blocking confluent-kafka pythong library issues


Lets consider this piece of code

from twisted.web import server, resource
from twisted.internet.task import LoopingCall
from confluent_kafka import Consumer, KafkaException
import json


# Function to handle Kafka consumer
def kafka_consumer():
    def fetch_data():
        def poll_kafka():
            msg = consumer.poll(0.1)  
            if msg is None:
                return
            if msg.error():
                if msg.error().code() == KafkaException._PARTITION_EOF:
                    return
                else:
                    return
            else:
                print("message", msg, msg.value())
                consumer.commit()  # Manually commit the offset

        # Execute Kafka polling in a separate thread
        d1 = threads.deferToThread(poll_kafka)

    def start_loop():
        lc = LoopingCall(fetch_data)
        lc.start(0.5) 

    conf = {
        'bootstrap.servers': 'kafka_internal-1:29093',
        'group.id': 'your_consumer_group-2',  
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': False  # Disable autocommit
    }

    consumer = Consumer(conf)
    consumer.subscribe(['jithin_test'])  # <-- is it a blocking call??

    start_loop()

# Web service handler
class WebService(resource.Resource):
    isLeaf = True

    def render_GET(self, request):
        # You can customize the response according to your needs
        response = {
            'message': 'Welcome to the Kafka consumer service!'
        }
        return json.dumps(response).encode('utf-8')


if __name__ == '__main__':
    reactor.callWhenRunning(kafka_consumer)

    # Run the Twisted web service
    root = WebService()
    site = server.Site(root)
    reactor.listenTCP(8180, site)
    reactor.run()

Here I'm instantiating Consumer object from confluent-kafka in the reactor thread, then leave the subsequent poll() to deferToThread(), I've few question regarding this,

  1. Consumer.subscribe() is it a blocking call? should I be deferTothread this method when invoking?
  2. will it corrupt the consumer where there is a consumer re-balancing happens if I'm firing another call to poll_kafka using deferToThread (as per my understanding, every time the thread that we run using deferToThread would be from the threadpool and there is no guarantee that we will be using the same thread)?
  3. if so is there a way to manage this? maybe running the entire stuff in a separate python thread and pass the consumed value back to the twisted application?
  4. or is there a way I can re-use the consumer object without corrupting the consumer?

Nb: the code is written in python2 it's an integration of some legacy system porting the whole stuff is not possible atm and most of the other libraries are available only support python 3+.


Solution

  • If you want to defer things to a thread that is not the reactor thread, I would recommend using https://docs.twistedmatrix.com/en/stable/api/twisted.internet.threads.html#deferToThreadPool with a custom https://docs.twistedmatrix.com/en/stable/api/twisted.python.threadpool.ThreadPool.html that you manage yourself, with minthreads=1, maxthreads=1.

    You do have to do a bit of annoying lifecycle management of this thread pool, but in this simple example that would just be .start() in kafka_consumer and .stop() in something added to reactor.addSystemEventTrigger("before", "shutdown", ...).