Search code examples
rapache-sparkapache-spark-mlsparklyr

Creating and applying ml_lib pipeline with external parameter in sparklyr


I am trying to create and apply a Spark ml_pipeline object that can handle an external parameter that will vary (typically a date). According to the Spark documentation, it seems possible: see part with ParamMap here

I haven't tried exactly how to do it. I was thinking of something like this:

table.df <- data.frame("a" = c(1,2,3))
table.sdf <- sdf_copy_to(sc, table.df)

param = 5
param2 = 4

# operation declaration
table2.sdf <- table.sdf %>% 
  mutate(test = param)

# pipeline creation
pipeline_1 = ml_pipeline(sc) %>%
  ft_dplyr_transformer(table2.sdf) %>%
  ml_fit(table.sdf, list("param" = param))

# pipeline application with another value for param
table2.sdf <- pipeline_1 %>% 
  ml_transform(table.sdf, list("param" = param2))

#result

glimpse(table2.sdf %>% select(test))
# doesn work...

Solution

  • That's really not how Spark ML Pipelines are intended to be used. In general all transformations required to convert input dataset to a format that is suitable for the Pipeline should be applied beforehand and only the common components should be embedded as stages.

    When using native (Scala) API, it is technically possible, in such simple cases, like this one, to use an empty SQLTransformer:

    import org.apache.spark.ml.Pipeline
    import org.apache.spark.ml.feature.SQLTransformer
    import org.apache.spark.ml.param.ParamPair
    
    val df = spark.range(1, 4).toDF("a")
    
    val sqlTransformer = new SQLTransformer()
    val pipeline = new Pipeline().setStages(Array(sqlTransformer))
    

    and supply statement Param for both fit

    val model = pipeline.fit(
      df,
      ParamPair(sqlTransformer.statement, "SELECT *, 4 AS `test` FROM __THIS__")
    )
    
    model.transform(df).show
    
    +---+----+
    |  a|test|
    +---+----+
    |  1|   4|
    |  2|   4|
    |  3|   4|
    +---+----+
    

    and transform:

    model.transform(
      df,
      ParamPair(sqlTransformer.statement, "SELECT *, 5 AS `test` FROM __THIS__")
     ).show
    
    +---+----+
    |  a|test|
    +---+----+
    |  1|   5|
    |  2|   5|
    |  3|   5|
    +---+----+
    

    but neither ml_fit nor ml_transform / ml_predict supports additional Params at the moment (as you can see ... are simply ignored).