How can I fix a: Group authorization failed
error when trying to consume messages from Kafka using the python client?
The same settings work fine in the Kafka CLI.
Usually this error hould point to invalid permissions, however, as the CLI works I suspect it must be something else.
My kafka instance is SASL/SSL enabled. The following settings are used:
src_schema_registry_conf = {
'url': 'http://<<host>>:8081'
}
src_schema_registry_client = SchemaRegistryClient(src_schema_registry_conf)
string_deserializer = StringDeserializer('utf_8')
avro_deserializer = AvroDeserializer(src_schema_registry_client)#,
src_conf = {
"bootstrap.servers": '<<host>>:9093',
"ssl.ca.location": 'certs/catrust.pem',
"security.protocol":"SASL_SSL",
"sasl.mechanism":"PLAIN",
"sasl.username":"<<username>",
"sasl.password":'<<secret_pasword',
'key.deserializer': string_deserializer,
'value.deserializer': avro_deserializer,
'group.id': "test-consumer-group",
'auto.offset.reset': "earliest"
}
Consumption of the messages is initiated from:
consumer = DeserializingConsumer(src_conf)
consumer.subscribe(['<<topic>>'])
while True:
try:
# SIGINT can't be handled when polling, limit timeout to 1 second.
msg = consumer.poll(1.0)
if msg is None:
continue
user = msg.value()
if user is not None:
print("record {}: value: {}\n"
.format(msg.key(), user))
except KeyboardInterrupt:
break
consumer.close()
I was trying to follow the exaple here: https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/avro_consumer.py but cannot get it to work in my environment.
It turns out that the answer is simpler than expected:
the username must be written in ALLCAPS
. Then it is accepted.