I am trying to process data from Kafka using Spark Structured Streaming. The code for ingesting the data is as follows:
val enriched = df.select($"value" cast "string" as "json")
.select(from_json($"json", schema) as "data")
.select("data.*")
ds
is a DataFrame with the data consumed from Kafka.
The problem comes when I try to read is as JSON in order to do faster queries. the function that comes from org.apache.spark.sql.functions
from_json()
is asking obligatory for a schema. What if the messages have some different fields?
As @zero323 and the answer he or she referenced suggest, you are asking a contradictory question: essentially how does one impose a schema when one doesn't know the schema? One can't of course. I think the idea to use open-ended collection types is your best option.
Ultimately though, it is almost certainly true that you can represent your data with a case class even if it means using a lot of Option
s, strings you need to parse, and maps you need to interrogate. Invest in the effort to define that case class. Otherwise, your Spark jobs will essentially a lot of ad hoc, time-consuming busywork.