Search code examples
databricksazure-databricksazure-eventhubdelta-lake

Event Hub: org.apache.spark.sql.AnalysisException: Required attribute 'body' not found


I am trying to write change data capture into EventHub as:

df = spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("cdc_test1")

While writing to azure eventhub it expects the content into a body attribute:

df.writeStream.format("eventhubs").option("checkpointLocation", checkpointLocation).outputMode("append").options(**ehConf).start()

It gives exception as

org.apache.spark.sql.AnalysisException: Required attribute 'body' not found.
    at org.apache.spark.sql.eventhubs.EventHubsWriter$.$anonfun$validateQuery$2(EventHubsWriter.scala:53)

I am not sure how to wrap whole stream into a body. I think, I need another stream object which has a column body with value of "df"(original stream) as string. I am not able to achieve this. Please help !


Solution

  • You just need to create this column by using functions struct (to encode all columns as one object) and something like to_json (to create a single value from the object - you can use other functions, like, to_csv, or to_avro, but it will depend on the contract with consumers). The code could look as following:

    df.select(F.to_json(F.struct("*")).alias("body"))\
        .writeStream.format("eventhubs")\
        .option("checkpointLocation", checkpointLocation)\
        .outputMode("append")\
        .options(**ehConf).start()