Search code examples
pythonapache-kafkatwisted

Communicating with a Kafka server from a Python app using the Twisted framework


I have an app written using the Twisted framework, that communicates various data (mostly - log entries in JSON format) to a variety of logging servers using output plug-ins. I would like to make it able to send the data to a Kafka server, too - but I am hitting some kind of problem that I don't know how to solve.

If I send data to the Kafka server in a straightforward way using Python and the kafka-python module, everything works just fine:

from json import dumps
from kafka import KafkaProducer

site = '<KAFKA SERVER>'
port = 9092
username = '<USERNAME>'
password = '<PASSWORD>'
topic = 'test'

producer = KafkaProducer(
    bootstrap_servers='{}:{}'.format(site, port),
    value_serializer=lambda v: bytes(str(dumps(v)).encode('utf-8')),
    sasl_mechanism='SCRAM-SHA-256',
    security_protocol='SASL_SSL',
    sasl_plain_username=username,
    sasl_plain_password=password
)

event = {
    'message': 'Test message'
}

try:
    producer.send(topic, event)
    producer.flush()
    print('Message sent.')
except Exception as e:
    print('Error producing message: {}'.format(e))
finally:
    producer.close()

However, if I try to send it from my actual Twisted app, using pretty much the same code, it hangs:

from json import dumps

from core import output
from kafka import KafkaProducer
from twisted.python.log import msg

class Output(output.Output):

    def start(self):
        site = '<KAFKA SERVER>'
        port = 9092
        username = '<USERNAME>'
        password = '<PASSWORD>'
        self.topic = 'test'
        self.producer = KafkaProducer(
            bootstrap_servers='{}:{}'.format(site, port),
            value_serializer=lambda v: bytes(str(dumps(v)).encode('utf-8')),
            sasl_mechanism='SCRAM-SHA-256',
            security_protocol='SASL_SSL',
            sasl_plain_username=username,
            sasl_plain_password=password
        )

    def stop(self):
        self.producer.flush()
        self.producer.close()

    def write(self, event):
        try:
            self.producer.send(self.topic, event)
            self.producer.flush()
        except Exception as e:
            msg('Kafka error: {}'.format(e))

(This won't run out-of-the box; it's just the plug-in and uses a generic Output class defined elsewhere.)

In particular, it hangs at the self.producer.send(self.topic, event) line.

I think that the problem comes from the fact that the Kafka producer in kafka-python is synchronous (blocking) and Twisted requires asynchronous (non-blocking) code. There is an asynchronous Kafka module, called afkak - but it doesn't seem to provide authentication with the Kafka server, so it is not suitable for my needs.

The way I understand it, the way to get around such problems in Twisted is to use deferreds. However, I have been unable to understand how exactly to do it. If I rewrite the write method like this

    def write(self, event):
        d = threads.deferToThread(self.postentry, event)
        d.addCallback(self.postentryCallback)
        return d

    def postentryCallback(self):
        reactor.stop()

    def postentry(self, event):
        try:
            self.producer.send(self.topic, event)
            self.producer.flush()
        except Exception as e:
            msg('Kafka error: {}'.format(e))

it no longer hangs when trying to send data to the Kafka server - but it hangs when the application terminates and nothing is send to the Kafka server anyway (which I can verify with a separate, consumer script, written in pure Python).

Any ideas what I am doing wrong and how to fix it?

Twisted has an asynchronous implementation of some typical otherwise blocking code, like making GET/POST requests to a web server - but Kafka is not a web server, it runs on port 9092, so I don't think that I can use that.


Solution

  • aiokafka is supposed to be a Python module for asynchronous communication with a Kafka server - but I couldn't make it work.

    Eventually, I solved my problem with the confluent-kafka module. Unfortunately, it is not a drop-in replacement for the kafka-python module, so the code had to be rewritten a bit:

    from json import dumps
    
    from core import output
    from confluent_kafka import Producer
    from twisted.python.log import msg
    
    
    class Output(output.Output):
    
        def start(self):
            site = <KAFKA SERVER>
            port = 9092
            username = '<USERNAME>'
            password = '<PASSWORD>'
            self.topic = 'test'
            self.producer = Producer({
                'bootstrap.servers': '{}:{}'.format(site, port),
                'sasl.mechanism': 'SCRAM-SHA-256',
                'security.protocol': 'SASL_SSL',
                'sasl.username': username,
                'sasl.password': password
            })
    
        def stop(self):
            self.producer.flush()
    
        def write(self, event):
            self.postentry(event)
    
        def delivery_callback(self, err, message):
            if err:
                msg('Kafka error: {}'.format(err))
    
        def postentry(self, event):
            try:
                self.producer.produce(
                    self.topic,
                    bytes(str(dumps(event)).encode('utf-8')),
                    callback=self.delivery_callback
                )
                self.producer.poll(0)
                self.producer.flush()
            except Exception as e:
                msg('Kafka error: {}'.format(e))