Search code examples
rapache-sparkdplyrsparkrsparklyr

group by and concatenate string using sparklyr


There are a number of questions asking precisely the same thing but none within the context of a sparklyr environment. How does one group by a column and then concatenate the values of some other column as a list?

For example the following results in the desired output in a local R environment.

mtcars %>%
  distinct(gear, cyl) %>%
  group_by(gear) %>%
  summarize(test_list = paste0(cyl, collapse = ";")) %>% 
  select(gear, test_list) %>%
  as.data.frame() %>%
  print()


   gear test_list
1    3     6;8;4
2    4       6;4
3    5     4;8;6

But registering that same table to spark and using the same code errors (sql parsing error, probably it attempts to apply spark's cocollapse function instead of R's C based collapse function) on the mutate (see code below). I know pyspark and spark SQL have collect_set() function that achieves the desired effect, is there something analogous for sparklyr?

sdf_copy_to(sc, x = mtcars, name = "mtcars_test")

tbl(sc, "mtcars_test") %>%
  distinct(gear, cyl) %>%
  group_by(gear) %>%
  summarize(test_list = paste0(cyl, collapse = ";"))

Error:

Error : org.apache.spark.sql.catalyst.parser.ParseException: 

In pyspark, the following approach is similar (except concatenated column is an array that can be collapsed).

from pyspark.sql.functions import collect_set

df2 = spark.table("mtcars_test")
df2.groupby("gear").agg(collect_set('cyl')).createOrReplaceTempView("mtcars_test_cont")

display(spark.table("mtcars_test_cont"))

gear collect_set(cyl)
 3   [8, 4, 6]
 4   [4, 6]
 5   [8, 4, 6]

Solution

  • Instead of using R functions, you could have used Spark SQL syntax directly by wrapping it inside sql function from dbplyr. Below is an example script to get desired output:

    sdf_copy_to(sc, x = mtcars, name = "mtcars_test")
    
    tbl(sc, "mtcars_test") %>%
      group_by(gear) %>%
      summarize(test_list = sql("array_join(collect_set(cast(cyl as int)), ';')"))
    
    #>   gear test_list
    #>  <dbl> <chr>    
    #>     4  6;4      
    #>     3  6;4;8    
    #>     5  6;4;8 
    

    I just changed the last line of your code where you used paste0 function.

    This is one reason why I prefer SparkR more than sparklyr, as almost all the syntax of PySpark works in the same manner.

    SparkR::agg(
      SparkR::group_by(SparkR::createDataFrame(mtcars), SparkR::column("gear")),
      test_list = SparkR::array_join(
        SparkR::collect_set(SparkR::cast(SparkR::column("cyl"), "integer")),
        ";"
      )
    ) %>% 
      SparkR::collect()
    
    #>  gear test_list
    #>    4       6;4
    #>    3     6;4;8
    #>    5     6;4;8