We are trying to use dynamic filter for a structured streaming application.
Let's say we have following pseudo-implementation of a Spark structured streaming application:
spark.readStream()
.format("kafka")
.option(...)
...
.load()
.filter(getFilter()) <-- dynamic staff - def filter(conditionExpr: String):
.writeStream()
.format("kafka")
.option(.....)
.start();
and getFilter returns string
String getFilter() {
// dynamic staff to create expression
return expression; // eg. "column = true";
}
Is it possible in current version of Spark to have a dynamic filter condition? I mean the getFilter()
method should dynamically return a filter condition (let's say it's refreshed each 10min). We tried to look into broadcast variable but not sure whether structured streaming supports such a thing.
It looks like it's not possible to update job's configuration once it's submitted. As a deploy we use yarn
.
Every suggestion/option is highly appreciated.
EDIT:
assume getFilter()
returns:
(columnA = 1 AND columnB = true) OR customHiveUDF(columnC, 'input') != 'required' OR columnD > 8
after 10 mins we can have small change (without first expression before first OR) and potentially we can have a new expression (columnA = 2
) eg:
customHiveUDF(columnC, 'input') != 'required' OR columnD > 10 OR columnA = 2
The goal is to have multiple filters for one spark application and don't submit multiple jobs.
Broadcast variable should be ok here. You can write typed filter like:
query.filter(x => x > bv.value).writeStream(...)
where bv is a Broadcast
variable. You can update it as described here: How can I update a broadcast variable in spark streaming?
Other solution is to provide i.e. RCP or RESTful endpoint and ask this endpoint every 10 minutes. For example (Java, because is simpler here):
class EndpointProxy {
Configuration lastValue;
long lastUpdated
public static Configuration getConfiguration (){
if (lastUpdated + refreshRate > System.currentTimeMillis()){
lastUpdated = System.currentTimeMillis();
lastValue = askMyAPI();
}
return lastValue;
}
}
query.filter (x => x > EndpointProxy.getConfiguration().getX()).writeStream()
Edit: hacky workaround for user's problem:
You can create 1-row view: // confsDF should be in some driver-side singleton var confsDF = Seq(some content).toDF("someColumn")
and then use:
query.crossJoin(confsDF.as("conf")) // cross join as we have only 1 value
.filter("hiveUDF(conf.someColumn)")
.writeStream()...
new Thread() {
confsDF = Seq(some new data).toDF("someColumn)
}.start();
This hack relies on Spark default execution model - microbatches. In each trigger the query is being rebuilt, so new data should be taken into consideration.
You can also in thread do:
Seq(some new data).toDF("someColumn).createOrReplaceTempView("conf")
and then in query:
.crossJoin(spark.table("conf"))
Both should work. Have in mind that it won't work with Continous Processing Mode