I have some problems using the ft_.. fuctions from the sparklyr R package. ft_bucketizer works, but ft_normalizer or ft_min_max_scaler does not. Here is an example:
sc <- spark_connect(master = "local", version = "2.1.0")
x = flights %>% select(dep_delay)
x_tbl <- sdf_copy_to(sc, x)
# works fine
ft_binarizer(x=x_tbl, input.col = "dep_delay", output.col = "delayed", threshold = 0)
# error
ft_normalizer(x= x_tbl, input.col = "dep_delay", output.col = "delayed_norm")
# error
ft_min_max_scaler(x= x_tbl,input.col = "dep_delay",output.col = "delayed_min_max")
The normalizer returns:
Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 9, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$createTransformFunc$1: (double) => vector)"
The min_max_scaler returns:
"Error: java.lang.IllegalArgumentException: requirement failed: Column dep_delay must be of type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 but was actually DoubleType."
I think it is a problem with the data type, but don't know how to solve it. Has anybody an idea what to do?
Many thanks in advance!
operates on Vector
columns so you have to use ft_vector_assembler
ft_vector_assembler(x_tbl, input_cols="dep_delay", output_col="dep_delay_v") %>%
ft_normalizer(input.col = "dep_delay_v", output.col = "delayed_v_norm")