Search code examples
kafka-python

How to create Kafka-python producer with ssl configuration


I'm trying to create kafka producer with ssl. I need information on how to set SSL parameters in the constructor, the information provided in kafka-python client is not descriptive enough.

What are the ssl_certfile, ssl_cafile, ssl_keyfile parameters. I'm not sure where to look for these files.

producer = KafkaProducer(bootstrap_servers=kafka_broker,
  value_serializer=lambda v: json.dumps(v).encode('utf-8'),
  security_protocol='SSL',
  api_version=(0,10),
  ssl_cafile='ca-certs.pem',ssl_certfile='server.pem',
  ssl_keyfile='server.pem',ssl_password='xxx')
producer.send('rk976772_topic',{"test":0})

Traceback (most recent call last): File "", line 1, in File "/usr/lib/python2.7/site-packages/kafka/producer/kafka.py", line 543, in send self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0) File "/usr/lib/python2.7/site-packages/kafka/producer/kafka.py", line 664, in _wait_on_metadata "Failed to update metadata after %.1f secs." % max_wait) kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.


Solution

  • I had to publish the message over SASL_SSL Used below code to create a producer with SASL_SSL protocol.

    from kafka import KafkaProducer
    
    security_protocol=environment_params.kafka_security_protocol
    if env=='dev':
        if security_protocol=='SASL_SSL':
            producer = KafkaProducer(bootstrap_servers=environment_params.dev_kafka_broker,value_serializer=lambda v: json.dumps(v).encode('utf-8'),security_protocol=security_protocol,ssl_cafile='ca-certs.pem',sasl_mechanism='GSSAPI',api_version=environment_params.dev_kafka_api_version)
        elif security_protocol=='PLAINTEXT':
            producer = KafkaProducer(bootstrap_servers=environment_params.dev_kafka_broker,value_serializer=lambda v: json.dumps(v).encode('utf-8'))