Search code examples
rapache-sparksparklyr

What is the equivalent of R's list() function in sparklyr?


Below is a sample R code. I would like to do the same in sparklyr.

custTrans1 <- Pdt_table %>% 
  group_by(Main_CustomerID) %>% 
  summarise(Invoice = as.vector(list(Invoice_ID)),Industry = as.vector(list(Industry)))

where Pdt_table is spark data frame and Main_CustomerID, Invoice_ID and Industry are variables.

I would like to create list of the above variables and convert it to vector. How can I do it in sparklyr?


Solution

  • You can use collect_list or collect_set:

    set.seed(1)
    df <- copy_to(
      sc, tibble(group = rep(c("a", "b"), 3), value = runif(6)),
      name = "df"
    )
    
    result <- df %>% group_by(group) %>% summarise(values = collect_list(value))
    result
    
    # Source:   lazy query [?? x 2]
    # Database: spark_connection
      group values    
      <chr> <list>    
    1 b     <list [3]>
    2 a     <list [3]>
    

    which is translated to the following query:

    result %>% show_query()
    
    <SQL>
    SELECT `group`, COLLECT_LIST(`value`) AS `values`
    FROM `df`
    GROUP BY `group`
    

    with corresponding execution plan:

    result %>% optimizedPlan()
    
    <jobj[213]>
      org.apache.spark.sql.catalyst.plans.logical.Aggregate
      Aggregate [group#259], [group#259, collect_list(value#260, 0, 0) AS values#345]
    +- InMemoryRelation [group#259, value#260], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `df`
          +- Scan ExistingRDD[group#259,value#260]
    

    and schema (with array<...> column):

    root
     |-- group: string (nullable = true)
     |-- values: array (nullable = true)
     |    |-- element: double (containsNull = true)
    

    Please keep in mind that:

    • Operation like this one is very expensive in a distributed system.
    • Depending on the data distribution might not be feasible.
    • Complex types are somewhat hard to handle in Spark in general, and sparklyr with it's tidy data focus, doesn't make things easier. To process the result efficiently you may require a Scala extension.