Search code examples
apache-sparkapache-spark-sqlspark-structured-streaming

How to use from_json to allow for messages to have different fields?


I am trying to process data from Kafka using Spark Structured Streaming. The code for ingesting the data is as follows:

val enriched = df.select($"value" cast "string" as "json")
  .select(from_json($"json", schema) as "data")
  .select("data.*")

ds is a DataFrame with the data consumed from Kafka.

The problem comes when I try to read is as JSON in order to do faster queries. the function that comes from org.apache.spark.sql.functions from_json() is asking obligatory for a schema. What if the messages have some different fields?


Solution

  • As @zero323 and the answer he or she referenced suggest, you are asking a contradictory question: essentially how does one impose a schema when one doesn't know the schema? One can't of course. I think the idea to use open-ended collection types is your best option.

    Ultimately though, it is almost certainly true that you can represent your data with a case class even if it means using a lot of Options, strings you need to parse, and maps you need to interrogate. Invest in the effort to define that case class. Otherwise, your Spark jobs will essentially a lot of ad hoc, time-consuming busywork.