Search code examples

Issue with using n_distinct in sparklyr to count distinct values based on condition

I'm encountering an issue while trying to count the number of distinct values in a Spark DataFrame column based on a condition using sparklyr. Here's the code I'm using:


df <- data.frame(
  appl = c("Apple", "Microsoft", "Google", "Amazon", "Facebook", "Samsung", "IBM"),
  appl_y = c("y", "n", "y", "n", "y", "n", "y"),
  manu = c("USA", "USA", "USA", "China", "USA", "South Korea", "USA"),
  alternate_flag = c("y", "n", "y", "y", "n", "y", "n")

# Connect to Spark
sc <- spark_connect(master = "local")

# Create the Spark DataFrame
df_spark <- copy_to(sc, df, "df_spark")

# Group by 'manu' and summarize
result <- df_spark %>%
  group_by(manu) %>%
  summarize(num_appl_y = n_distinct(appl[appl_y == 'y']), 
num_appl_flag=n_distinct(appl[alternate_flag == 'y'])

Show the result

The intention is to group the data by the manu column and then count the number of distinct values in the appl column where the corresponding appl_y and alternate_flag column is 'y' within each group. However, this doesn't work, the count is off when I do it this way in sparklyr.


  • There's this open issue - n_distinct() translation issue #3253
    Currently query gets translated to:

      COUNT(DISTINCT(ARRAY(CASE WHEN (`appl_y` = "y") THEN (`appl`) END))) AS `num_appl_y`,
      COUNT(DISTINCT(ARRAY(CASE WHEN (`alternate_flag` = "y") THEN (`appl`) END))) AS `num_appl_flag`
    FROM `df_spark`
    GROUP BY `manu`

    , and NA / empty values in those arrays mess up counts.

    Maybe I'm missing something, but what about counting "y" values with sum(appl_y == "y") & sum(alternate_flag == "y") ?

    df <- data.frame(
      appl = c("Apple", "Microsoft", "Google", "Amazon", "Facebook", "Samsung", "IBM"),
      appl_y = c("y", "n", "y", "n", "y", "n", "y"),
      manu = c("USA", "USA", "USA", "China", "USA", "South Korea", "USA"),
      alternate_flag = c("y", "n", "y", "y", "n", "y", "n")
    # Connect to Spark (2.4.3)
    sc <- spark_connect(master = "local")
    # Create the Spark DataFrame
    df_spark <- copy_to(sc, df, "df_spark")
    # Spark summary
    result <- df_spark %>% 
      group_by(manu) %>% 
        # sparkly / Spark need a little help with boolean to numeric casting
        num_appl_y = sum(as.integer(appl_y == "y")),
        num_appl_flag = sum(as.integer(alternate_flag == "y"))
    # Show the result
    #> Warning: Missing values are always removed in SQL aggregation functions.
    #> Use `na.rm = TRUE` to silence this warning
    #> This warning is displayed once every 8 hours.
    #> # A tibble: 3 × 3
    #>   manu        num_appl_y num_appl_flag
    #>   <chr>            <dbl>         <dbl>
    #> 1 South Korea          0             1
    #> 2 China                0             1
    #> 3 USA                  4             2
    #> <SQL>
    #> SELECT
    #>   `manu`,
    #>   SUM(CAST(`appl_y` = "y" AS INT)) AS `num_appl_y`,
    #>   SUM(CAST(`alternate_flag` = "y" AS INT)) AS `num_appl_flag`
    #> FROM `df_spark`
    #> GROUP BY `manu`

    dplyr on df for reference:

    # reference summary
    df %>%
      group_by(manu) %>%
      summarize(num_appl_y = n_distinct(appl[appl_y == 'y']), 
                num_appl_flag=n_distinct(appl[alternate_flag == 'y'])
    #> # A tibble: 3 × 3
    #>   manu        num_appl_y num_appl_flag
    #>   <chr>            <int>         <int>
    #> 1 China                0             1
    #> 2 South Korea          0             1
    #> 3 USA                  4             2