I am trying to write change data capture into EventHub as:
df = spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("cdc_test1")
While writing to azure eventhub it expects the content into a body attribute:
df.writeStream.format("eventhubs").option("checkpointLocation", checkpointLocation).outputMode("append").options(**ehConf).start()
It gives exception as
org.apache.spark.sql.AnalysisException: Required attribute 'body' not found.
at org.apache.spark.sql.eventhubs.EventHubsWriter$.$anonfun$validateQuery$2(EventHubsWriter.scala:53)
I am not sure how to wrap whole stream into a body. I think, I need another stream object which has a column body with value of "df"(original stream) as string. I am not able to achieve this. Please help !
You just need to create this column by using functions struct (to encode all columns as one object) and something like to_json (to create a single value from the object - you can use other functions, like, to_csv
, or to_avro, but it will depend on the contract with consumers). The code could look as following:
df.select(F.to_json(F.struct("*")).alias("body"))\
.writeStream.format("eventhubs")\
.option("checkpointLocation", checkpointLocation)\
.outputMode("append")\
.options(**ehConf).start()