I've a Spark application that receives data in a dataframe:
Dataset<Row> df = spark.readStream().format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "topic").load().selectExpr("CAST(key AS STRING) as key");
String my_key = df.select("key").first().toString();
if (my_key == "a")
{
do_stuff
}
Basically I will need to in case of value a then I apply some transformations on the dataframe otherwise I apply other transformations.
However, I am dealing with streaming queries and when I tried to apply my code above I got:
Queries with streaming sources must be executed with writeStream.start()
The error happens when I make the first operation.
Anyone have any ideas?
Thanks in advance :)
I was able to sole my problem using:
Dataset<Row> df = spark.readStream().format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "topic").load().selectExpr("CAST(key AS STRING) as key").filter(functions.col("key").contains("a"));