Search code examples
apache-sparkpysparkapache-kafkaspark-structured-streamingspark-kafka-integration

How to writestream to specific kafka cluster in Azure databricks? " Topic mytopic not present in metadata after 60000 ms."


I am trying to write data to Kafka with the writestream method. I have been given the following properties by the source system.

topic = 'mytopic'
host = "myhost.us-west-1.aws.confluent.cloud:9092"
userid = 'myuser'
password ='mypassword'  
Cluster = 'cluster-numeric-test-03'

While i found many exemples of uses of topic,host,userid,password on stackoverflow, i can barely find any documentation on cluster. I tried with the following paramater that i can see in this part of the documentation : "spark.kafka.clusters.cluster.auth.bootstrap.servers" https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#configuration

I am hence using the below code to connect :

(
df.select("key", "value","partition")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", host)
.option("topic", topic)
.trigger(availableNow=True)
.option("kafka.sasl.jaas.config",
     'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{}" password="{}";'.format(userid, password)) 
.option("checkpointLocation", "/mnt/Sales/Markerting/Whiteboards/temp/")
.option('spark.kafka.clusters.cluster.auth.bootstrap.servers',Cluster)
.start()
        )

I am getting the following error :

kafkashaded.org.apache.kafka.common.errors.TimeoutException: Topic mytopic not present in metadata after 60000 ms.


Solution

  • Actually i was missing some stuff there.

    .option("kafka.security.protocol", "SASL_SSL") .option("kafka.sasl.mechanism", "PLAIN")

    And it works without adding the cluster details.

    I am hence using the below code to connect :

        **(
        df.select("key", "value","partition")
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", host)
        .option("topic", topic)
        .trigger(availableNow=True)
        .option("kafka.sasl.jaas.config",
             'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{}" password="{}";'.format(userid, password)) 
        .option("checkpointLocation", "/mnt/Sales/Markerting/Whiteboards/temp/")
        .option("kafka.security.protocol", "SASL_SSL") 
      .option("kafka.sasl.mechanism", "PLAIN") 
        .start()
                )**