I am trying to create and apply a Spark
ml_pipeline
object that can handle an external parameter that will vary (typically a date). According to the Spark
documentation, it seems possible: see part with ParamMap
here
I haven't tried exactly how to do it. I was thinking of something like this:
table.df <- data.frame("a" = c(1,2,3))
table.sdf <- sdf_copy_to(sc, table.df)
param = 5
param2 = 4
# operation declaration
table2.sdf <- table.sdf %>%
mutate(test = param)
# pipeline creation
pipeline_1 = ml_pipeline(sc) %>%
ft_dplyr_transformer(table2.sdf) %>%
ml_fit(table.sdf, list("param" = param))
# pipeline application with another value for param
table2.sdf <- pipeline_1 %>%
ml_transform(table.sdf, list("param" = param2))
#result
glimpse(table2.sdf %>% select(test))
# doesn work...
That's really not how Spark ML Pipelines are intended to be used. In general all transformations required to convert input dataset to a format that is suitable for the Pipeline
should be applied beforehand and only the common components should be embedded as stages
.
When using native (Scala) API, it is technically possible, in such simple cases, like this one, to use an empty SQLTransformer
:
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.SQLTransformer
import org.apache.spark.ml.param.ParamPair
val df = spark.range(1, 4).toDF("a")
val sqlTransformer = new SQLTransformer()
val pipeline = new Pipeline().setStages(Array(sqlTransformer))
and supply statement
Param
for both fit
val model = pipeline.fit(
df,
ParamPair(sqlTransformer.statement, "SELECT *, 4 AS `test` FROM __THIS__")
)
model.transform(df).show
+---+----+
| a|test|
+---+----+
| 1| 4|
| 2| 4|
| 3| 4|
+---+----+
and transform
:
model.transform(
df,
ParamPair(sqlTransformer.statement, "SELECT *, 5 AS `test` FROM __THIS__")
).show
+---+----+
| a|test|
+---+----+
| 1| 5|
| 2| 5|
| 3| 5|
+---+----+
but neither ml_fit
nor ml_transform
/ ml_predict
supports additional Params
at the moment (as you can see ...
are simply ignored).