Search code examples
rapache-sparkapache-spark-mllibsparklyr

sparklyr feature transformation functions result in error


I have some problems using the ft_.. fuctions from the sparklyr R package. ft_bucketizer works, but ft_normalizer or ft_min_max_scaler does not. Here is an example:

library(sparklyr)
library(dplyr)
library(nycflights13)

sc <- spark_connect(master = "local", version = "2.1.0")
x = flights %>% select(dep_delay)
x_tbl <- sdf_copy_to(sc, x) 

# works fine
ft_binarizer(x=x_tbl, input.col = "dep_delay", output.col = "delayed", threshold = 0)

# error
ft_normalizer(x= x_tbl, input.col = "dep_delay", output.col = "delayed_norm")

# error
ft_min_max_scaler(x= x_tbl,input.col = "dep_delay",output.col = "delayed_min_max")

The normalizer returns:

Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 9, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$createTransformFunc$1: (double) => vector)"

The min_max_scaler returns:

"Error: java.lang.IllegalArgumentException: requirement failed: Column dep_delay must be of type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 but was actually DoubleType."

I think it is a problem with the data type, but don't know how to solve it. Has anybody an idea what to do?

Many thanks in advance!


Solution

  • ft_normalizer operates on Vector columns so you have to use ft_vector_assembler first:

    ft_vector_assembler(x_tbl, input_cols="dep_delay", output_col="dep_delay_v") %>% 
      ft_normalizer(input.col = "dep_delay_v", output.col = "delayed_v_norm")