Search code examples
arraysrapache-spark-sqlsparkr

SparkR - extracting dataframe's array<int> for an R function


I have 1000s of sensors, I need to partition the data (i.e. per sensor per day) then submit each list of data points to an R algorithm). Using Spark, simplified sample looks like:

//Spark
val rddData = List(
 ("1:3", List(1,1,456,1,1,2,480,0,1,3,425,0)), 
 ("1:4", List(1,4,437,1,1,5,490,0)),
 ("1:6", List(1,6,500,0,1,7,515,1,1,8,517,0,1,9,522,0,1,10,525,0)),
 ("1:11", List(1,11,610,1))
)

case class DataPoint(
  key:  String,
  value:    List[Int])  // 4 value pattern, sensorID:seq#, seq#, value, state

I convert to a parquet file, save it. Load the parquet in SparkR, no problem, the schema says:

#SparkR
df <- read.df(sqlContext, filespec, "parquet")
schema(df)
StructType
|-name = "key", type = "StringType", nullable = TRUE
|-name = "value", type = "ArrayType(IntegerType,true)", nullable = TRUE

So in SparkR, I have a dataframe where each record has all of the data I want (df$value). I want to extract that array into something R can consume then mutate my original dataframe(df) with a new column holding the resultant array. Logically something like results = function(df$value). Then I need to get results (for all rows) back into a SparkR dataframe for output.

How to I extract an array from the SparkR dataframe then mutate with the results?


Solution

  • Let spark data frame be, df and R data frame be df_r To convert sparkR df to R df, use code

    df_r <- collect(df)

    with R data frame df_r, you can do all computations you want to do in R. let say you have the result in column df_r$result

    Then for converting back to SparkR data frame use code,
    #this is a new SparkR data frame, df_1
    df_1 <- createDataFrame(sqlContext, df_r) 
    
    For adding the result back to SparkR data frame `df` use code
    #this adds the df_1$result to a new column df$result 
    #note that number of rows should be same in df and `df_1`, if not use `join` operation
    df$result <- df_1$result 
    

    Hope this solves your problem