The below code is working as it should, i.e. data is written to the output table and is selectable from the table within 10 seconds. The problem is that foreachBatch is not executed.
When I have tested it with .format("console") and calling .start() then foreachBatch is run. So it feels like .toTable() is to blame here.
This code is using the Kafka connector but the same problems existed with Event hub connector.
If I try to add .start() after toTable() is get the error
'StreamingQuery' object has no attribute 'start'
Here is the code that is working except foreachBatch
TOPIC = "myeventhub"
BOOTSTRAP_SERVERS = "myeventhub.servicebus.windows.net:9093"
EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=mykeyname;SharedAccessKey=mykey;EntityPath=myentitypath;\";"
df = spark.readStream \
.format("kafka") \
.option("subscribe", TOPIC) \
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", EH_SASL) \
.option("kafka.request.timeout.ms", "60000") \
.option("kafka.session.timeout.ms", "60000") \
.option("failOnDataLoss", "false") \
.option("startingOffsets", "earliest") \
.load()
n = 100
count = 0
def run_command(batchDF, epoch_id):
global count
count += 1
if count % n == 0:
spark.sql("OPTIMIZE firstcatalog.bronze.factorydatas3 ZORDER BY (readtimestamp)")
...Omitted code where I transform the data in the value column to strongly typed data...
myTypedDF.writeStream \
.foreachBatch(run_command) \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/") \
.partitionBy("somecolumn") \
.toTable("myunitycatalog.bronze.mytable")
you either do foreachBatch
or toTable
, but not both. You can move writing to table inside the foreachBatch function - just make sure that you do idempotent writes because batch could be restarted. Change your code to this:
def run_command(batchDF, epoch_id):
global count
batchDF.write.format("delta") \
.option("txnVersion", epoch_id) \
.option("txnAppId", "my_app") \
.partitionBy("somecolumn") \
.mode("append") \
.saveAsTable("myunitycatalog.bronze.mytable")
count += 1
if count % n == 0:
spark.sql("OPTIMIZE myunitycatalog.bronze.mytable ZORDER BY (readtimestamp)")
myTypedDF.writeStream \
.foreachBatch(run_command) \
.outputMode("append") \
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/") \
.start()