Search code examples
apache-sparkazure-databricksspark-structured-streamingdelta-lake

Writing stream in Databricks with toTable doesn't execute foreachBatch


The below code is working as it should, i.e. data is written to the output table and is selectable from the table within 10 seconds. The problem is that foreachBatch is not executed.

When I have tested it with .format("console") and calling .start() then foreachBatch is run. So it feels like .toTable() is to blame here.

This code is using the Kafka connector but the same problems existed with Event hub connector.

If I try to add .start() after toTable() is get the error

'StreamingQuery' object has no attribute 'start'

Here is the code that is working except foreachBatch

TOPIC = "myeventhub"
BOOTSTRAP_SERVERS = "myeventhub.servicebus.windows.net:9093"
EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=mykeyname;SharedAccessKey=mykey;EntityPath=myentitypath;\";"

df = spark.readStream \
    .format("kafka") \
    .option("subscribe", TOPIC) \
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.jaas.config", EH_SASL) \
    .option("kafka.request.timeout.ms", "60000") \
    .option("kafka.session.timeout.ms", "60000") \
    .option("failOnDataLoss", "false") \
    .option("startingOffsets", "earliest") \
    .load()

n = 100
count = 0

def run_command(batchDF, epoch_id):
    global count
    count += 1
    if count % n == 0:
        spark.sql("OPTIMIZE firstcatalog.bronze.factorydatas3 ZORDER BY (readtimestamp)")

...Omitted code where I transform the data in the value column to strongly typed data...

myTypedDF.writeStream \
    .foreachBatch(run_command) \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/delta/events/_checkpoints/") \
    .partitionBy("somecolumn") \
    .toTable("myunitycatalog.bronze.mytable")

Solution

  • you either do foreachBatch or toTable, but not both. You can move writing to table inside the foreachBatch function - just make sure that you do idempotent writes because batch could be restarted. Change your code to this:

    def run_command(batchDF, epoch_id):
        global count
        batchDF.write.format("delta") \
           .option("txnVersion", epoch_id) \
           .option("txnAppId", "my_app") \
           .partitionBy("somecolumn") \
           .mode("append") \
           .saveAsTable("myunitycatalog.bronze.mytable")
        count += 1
        if count % n == 0:
            spark.sql("OPTIMIZE myunitycatalog.bronze.mytable ZORDER BY (readtimestamp)")
    
    myTypedDF.writeStream \
        .foreachBatch(run_command) \
        .outputMode("append") \
        .option("checkpointLocation", "/tmp/delta/events/_checkpoints/") \
        .start()