Search code examples
pythonsslapache-kafkakafka-consumer-apisasl

python confluent kafka: Group authorization failed


How can I fix a: Group authorization failed error when trying to consume messages from Kafka using the python client?

The same settings work fine in the Kafka CLI.

Usually this error hould point to invalid permissions, however, as the CLI works I suspect it must be something else.

My kafka instance is SASL/SSL enabled. The following settings are used:

src_schema_registry_conf = {
    'url': 'http://<<host>>:8081'
}

src_schema_registry_client = SchemaRegistryClient(src_schema_registry_conf)
string_deserializer = StringDeserializer('utf_8')
avro_deserializer = AvroDeserializer(src_schema_registry_client)#,

src_conf = {
    "bootstrap.servers": '<<host>>:9093',
    "ssl.ca.location": 'certs/catrust.pem',
    "security.protocol":"SASL_SSL",
    "sasl.mechanism":"PLAIN",
    "sasl.username":"<<username>",
    "sasl.password":'<<secret_pasword',
    'key.deserializer': string_deserializer,
     'value.deserializer': avro_deserializer,
    'group.id': "test-consumer-group",
    'auto.offset.reset': "earliest"
}

Consumption of the messages is initiated from:

consumer = DeserializingConsumer(src_conf)
consumer.subscribe(['<<topic>>'])

while True:
    try:
        # SIGINT can't be handled when polling, limit timeout to 1 second.
        msg = consumer.poll(1.0)
        if msg is None:
            continue

        user = msg.value()
        if user is not None:
            print("record {}: value: {}\n"
                  .format(msg.key(), user))
    except KeyboardInterrupt:
        break

consumer.close()

I was trying to follow the exaple here: https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/avro_consumer.py but cannot get it to work in my environment.


Solution

  • It turns out that the answer is simpler than expected:

    the username must be written in ALLCAPS. Then it is accepted.