Search code examples
apache-sparkspark-structured-streaming

Spark - Java - Filter Streaming Queries


I've a Spark application that receives data in a dataframe:

Dataset<Row> df = spark.readStream().format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "topic").load().selectExpr("CAST(key AS STRING) as key");
String my_key = df.select("key").first().toString();
if (my_key == "a")
{
do_stuff
}

Basically I will need to in case of value a then I apply some transformations on the dataframe otherwise I apply other transformations.

However, I am dealing with streaming queries and when I tried to apply my code above I got:

Queries with streaming sources must be executed with writeStream.start()

The error happens when I make the first operation.

Anyone have any ideas?

Thanks in advance :)


Solution

  • I was able to sole my problem using:

    Dataset<Row> df = spark.readStream().format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "topic").load().selectExpr("CAST(key AS STRING) as key").filter(functions.col("key").contains("a"));