Search code examples
apache-sparkspark-structured-streaming

spark structured streaming dynamic string filter


We are trying to use dynamic filter for a structured streaming application.

Let's say we have following pseudo-implementation of a Spark structured streaming application:

spark.readStream()
     .format("kafka")
     .option(...)
     ...
     .load()
     .filter(getFilter()) <-- dynamic staff - def filter(conditionExpr: String):
     .writeStream()
     .format("kafka")
     .option(.....)
     .start();

and getFilter returns string

String getFilter() {
   // dynamic staff to create expression
   return expression; // eg. "column = true";
}

Is it possible in current version of Spark to have a dynamic filter condition? I mean the getFilter() method should dynamically return a filter condition (let's say it's refreshed each 10min). We tried to look into broadcast variable but not sure whether structured streaming supports such a thing.

It looks like it's not possible to update job's configuration once it's submitted. As a deploy we use yarn.

Every suggestion/option is highly appreciated.


EDIT: assume getFilter() returns:

(columnA = 1 AND columnB = true) OR customHiveUDF(columnC, 'input') != 'required' OR columnD > 8

after 10 mins we can have small change (without first expression before first OR) and potentially we can have a new expression (columnA = 2) eg:

customHiveUDF(columnC, 'input') != 'required' OR columnD > 10 OR columnA = 2

The goal is to have multiple filters for one spark application and don't submit multiple jobs.


Solution

  • Broadcast variable should be ok here. You can write typed filter like:

    query.filter(x => x > bv.value).writeStream(...)
    

    where bv is a Broadcast variable. You can update it as described here: How can I update a broadcast variable in spark streaming?

    Other solution is to provide i.e. RCP or RESTful endpoint and ask this endpoint every 10 minutes. For example (Java, because is simpler here):

    class EndpointProxy {
    
         Configuration lastValue;
         long lastUpdated
         public static Configuration getConfiguration (){
    
              if (lastUpdated + refreshRate > System.currentTimeMillis()){
                   lastUpdated = System.currentTimeMillis();
                   lastValue = askMyAPI();
              }
              return lastValue;
         }
    }
    
    
    query.filter (x => x > EndpointProxy.getConfiguration().getX()).writeStream()
    

    Edit: hacky workaround for user's problem:

    You can create 1-row view: // confsDF should be in some driver-side singleton var confsDF = Seq(some content).toDF("someColumn")

    and then use:
    query.crossJoin(confsDF.as("conf")) // cross join as we have only 1 value 
          .filter("hiveUDF(conf.someColumn)")
          .writeStream()...
    
     new Thread() {
         confsDF = Seq(some new data).toDF("someColumn)
     }.start();
    

    This hack relies on Spark default execution model - microbatches. In each trigger the query is being rebuilt, so new data should be taken into consideration.

    You can also in thread do:

    Seq(some new data).toDF("someColumn).createOrReplaceTempView("conf")
    

    and then in query:

    .crossJoin(spark.table("conf"))
    

    Both should work. Have in mind that it won't work with Continous Processing Mode