Search code examples
pythongoogle-cloud-platformapache-kafkaapache-beamconfluent-platform

kerberos error while authenticating on Confluent Kafka


I´ve been trying to understand apache beam, confluent kafka and dataflow integration with python 3.8 and beam sdk 2.7 the desire result is to build a pipeline (which is going to be ran on dataflow) which consumes from confluent kafka and and just logs the messages on gcp.(I'm using JDK 17 btw)

This is the code I´m using:

import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions
import os
import json
import logging

os.environ['GOOGLE_APPLICATION_CREDENTIALS']='credentialsOld.json'

with open('cluster.configuration.json') as cluster:
    data=json.load(cluster)
    cluster.close()

def logger(element):
    logging.INFO('Something was found')  

config={
        "bootstrap.servers":data["bootstrap.servers"],
        "security.protocol":"SASL_SSL",
        "sasl.mechanisms":"PLAIN",
        "session.timeout.ms":data["session.timeout.ms"],
        "group.id":"tto",
        "sasl.jaas.config":f'org.apache.kafka.common.security.plain.PlainLoginModule required serviceName="Kafka" username=\"{data["sasl.username"]}\" password=\"{data["sasl.password"]}\";',
        "auto.offset.reset":"earliest"
    }

 
def main():
    
    print('======================================================')
    beam_options = PipelineOptions(runner='DataflowRunner',project='project',experiments=['use_runner_v2'],streaming=True,save_main_session=True,job_name='kafka-stream-test')
    with beam.Pipeline(options=beam_options) as p:
        msgs = p | 'ReadKafka' >> ReadFromKafka(consumer_config=config,topics=['users'])
        msgs | beam.FlatMap(logger)
        
if __name__ == '__main__':    
    main()

I have tested this pipeline with dataflow but also with direct runner and on both runners I get a this error: "Timeout while fetching topic metadata".

This error seems to be caused because of the consumer being unable to authenticate to confluent kafka since I get these warnings:

WARNING:root:severity: WARN
timestamp {
  seconds: 1650473787
  nanos: 331000000
}
message: "[Consumer clientId=consumer-tto-1, groupId=tto] Bootstrap broker "Here is suposed to be mi broker ip but I wont show it" (id: -1 rack: null) disconnected"
instruction_id: "bundle_1"
transform_id: "ReadFromKafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/ParDo(GenerateKafkaSourceDescriptor)/ParMultiDo(GenerateKafkaSourceDescriptor)"
log_location: "org.apache.kafka.clients.NetworkClient"
thread: "21"

And after this warning I get this other warning:

message: "[Consumer clientId=consumer-tto-1, groupId=tto] Error connecting to node "Here is suposed to be mi broker ip but I wont show it" (id: -1 rack: null)"
trace: "java.io.IOException: Channel could not be created for socket java.nio.channels.SocketChannel[closed]\n\tat

Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator

Caused by: org.apache.kafka.common.KafkaException: Principal could not be determined from Subject, this may be a transient failure due to Kerberos re-login

Two important things are that I already build a consumer on python but without the ReadFromKafka apache beam io and it connects and consumes perfectly to the topic so the credentials I'm using are the same and I have the same protocol "SASL_SSL""PLAIN" (related to this also I don't have any idea why is a kerberos error popping since I'm not using kerberos authentication)... the other thing is that the transform 'ReadFromKafka' is used through a expansion service since this transform is only supported by java but with apache beam I can use it on Python.


Solution

  • Ok the mistake was really simple to fix, I had a typo in 'sasl.mechanisms' so the property wasn't getting recognized.

    Instead of sasl.mechanisms use sasl.mechanism.