I'm trying to create kafka producer with ssl. I need information on how to set SSL parameters in the constructor, the information provided in kafka-python client is not descriptive enough.
What are the ssl_certfile
, ssl_cafile
, ssl_keyfile
parameters. I'm not sure where to look for these files.
producer = KafkaProducer(bootstrap_servers=kafka_broker,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
security_protocol='SSL',
api_version=(0,10),
ssl_cafile='ca-certs.pem',ssl_certfile='server.pem',
ssl_keyfile='server.pem',ssl_password='xxx')
producer.send('rk976772_topic',{"test":0})
Traceback (most recent call last): File "", line 1, in File "/usr/lib/python2.7/site-packages/kafka/producer/kafka.py", line 543, in send self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0) File "/usr/lib/python2.7/site-packages/kafka/producer/kafka.py", line 664, in _wait_on_metadata "Failed to update metadata after %.1f secs." % max_wait) kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
I had to publish the message over SASL_SSL Used below code to create a producer with SASL_SSL protocol.
from kafka import KafkaProducer
security_protocol=environment_params.kafka_security_protocol
if env=='dev':
if security_protocol=='SASL_SSL':
producer = KafkaProducer(bootstrap_servers=environment_params.dev_kafka_broker,value_serializer=lambda v: json.dumps(v).encode('utf-8'),security_protocol=security_protocol,ssl_cafile='ca-certs.pem',sasl_mechanism='GSSAPI',api_version=environment_params.dev_kafka_api_version)
elif security_protocol=='PLAINTEXT':
producer = KafkaProducer(bootstrap_servers=environment_params.dev_kafka_broker,value_serializer=lambda v: json.dumps(v).encode('utf-8'))