Search code examples
rapache-sparkdplyrmatchingsparklyr

Value matching under sparklyr


I tried value matching under sparklyr using:

spark_parquet %>% filter(customer_id %in% spark_unique_customer_ids)

However, I received the following error:

Error in UseMethod("escape") : no applicable method for 'escape' applied to an object of class "c('tbl_spark', 'tbl_sql', 'tbl_lazy', 'tbl')"

Any suggestions how to tackle this?


Solution

  • It is best to use semi_join:

    spark_parquet <- copy_to(
      sc, 
      tibble(customer_id = c(1, 2, 3), value = c(-1, 0, 1))
    )
    spark_unique_customer_ids <- copy_to(sc, tibble(customer_id = c(1, 3)))
    
    spark_parquet %>% semi_join(spark_unique_customer_ids, by = "customer_id")
    
    # Source: spark<?> [?? x 2]
      customer_id value
    *       <dbl> <dbl>
    1           1    -1
    2           3     1
    

    although SQL API should work as well:

    spark_parquet %>%  sdf_register("spark_parquet")
    spark_unique_customer_ids %>% sdf_register("spark_unique_customer_ids")
    
    sc %>% spark_session() %>% 
      invoke(
        "sql", 
        "SELECT * FROM spark_parquet 
         WHERE customer_id IN (
           SELECT customer_id FROM spark_unique_customer_ids)") %>% 
       sdf_register()
    
    # Source: spark<?> [?? x 2]
      customer_id value
    *       <dbl> <dbl>
    1           1    -1
    2           3     1