Search code examples
pysparkdatabricksazure-databricksapache-kafka-streams

Unable to write Data from Kafka to Delta Live Table in Databricks


I have this code:

@dlt.table(  
    name="kafka_bronze",  
    table_properties={"pipelines.autoOptimize.enabled": "true"}  
)  
def kafka_bronze():  
    df = (spark.readStream   
        .format("kafka")   
        .option("kafka.bootstrap.servers", {Masked})  
        .option("subscribe", "topic1")    
        .option("startingOffsets", "earliest")   
        .option("maxOffsetsPerTrigger", 100)   
        .load()  
        .select(col("value").cast(StringType()).alias("json"))  
        .select(from_json("json", jsonSchema).alias("data"))  
        .select("data.*"))  
  
    return df  




However it fails and doesn't write any data. I can stream data into notebook successfully but with pipeline it's not loading the data. FYI I am using a unity catalog


Solution

  • The error you are getting says org.apache.spark.sql.streaming.StreamingQueryException, specifically mentioning a TimeoutException from Kafka, Indicates that your Spark streaming job is timing out while trying to communicate with the Kafka brokers.

    I agree with @JayashankarGS can happen due to reasons such as network issues Kafka broker overload, or incorrect Kafka configuration in your Spark job.

    The below code helps you increase the timeout settings for your Kafka:

    .option("kafka.consumer.request.timeout.ms", "60000")
    .option("kafka.session.timeout.ms", "30000")
    

    I have tried the below code:

    def kafka_bronze():  
        df = (spark.readStream   
            .format("kafka")   
            .option("kafka.bootstrap.servers", "your_kafka_bootstrap_servers")
            .option("subscribe", "topic1")    
            .option("startingOffsets", "earliest")   
            .option("maxOffsetsPerTrigger", 100)  
            .option("request.timeout.ms", "60000")
            .option("session.timeout.ms", "30000") 
            .load()  
            .select(col("value").cast(StringType()).alias("json"))  
            .select(from_json("json", jsonSchema).alias("data"))  
            .select("data.*"))  
      
        return df
    

    Results:

    
    Name    Type
    field1  string
    field2  string