Search code examples
rapache-sparksparklyr

Error in filtering after join in sparklyr


My used code looks like this: (just a simple join)

tbl(sc, 'dez') %>% inner_join(tbl(sc, 'deg'), by = c("timefrom" = "timefromdeg", "elemuid")) %>% 
    filter(number.x > 2500) %>% glimpse()

The content of the single dataframe doesn't matter. The join itself would work. To save computing power I would like to filter (or whatever) directly after joining.

But now I get the error message that Spark can't ressolve variable number.x .

I don't get it, because the variables are part of the error message:

Error: org.apache.spark.sql.AnalysisException: cannot resolve '`number.x`' given input columns: [elemname.x, kind.y, timefrom, timetodeg, timeto, kind.x, elemuid, elemname.y, number.y, number.x]; line 7 pos 7;
'Project [*]
+- 'Filter ('number.x > 2500.0)
   +- SubqueryAlias yoxgbdyqlw
      +- Project [elemuid#7505 AS elemuid#7495, elemname#7506 AS elemname.x#7496, kind#7507 AS kind.x#7497, number#7508 AS number.x#7498, timefrom#7509 AS timefrom#7499, timeto#7510 AS timeto#7500, elemname#7512 AS elemname.y#7501, kind#7513 AS kind.y#7502, number#7514 AS number.y#7503, timetodeg#7516 AS timetodeg#7504]
         +- Join Inner, ((timefrom#7509 = timefromdeg#7515) && (elemuid#7505 = elemuid#7511))
            :- SubqueryAlias TBL_LEFT
            :  +- SubqueryAlias dez
            :     +- HiveTableRelation `default`.`dez`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [elemuid#7505, elemname#7506, kind#7507, number#7508, timefrom#7509, timeto#7510]
            +- SubqueryAlias TBL_RIGHT
               +- SubqueryAlias deg
                  +- HiveTableRelation `default`.`deg`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [elemuid#7511, elemname#7512, kind#7513, number#7514, timefromdeg#7515, timetodeg#7516]

A collect() after joining is not an options, because then I am running out of memory. Is there any posiblity to make stuff happen.

I would be very happy about help!


Solution

  • TL;DR Don't use default suffix (c(".x", ".y")):

    set.seed(1)
    
    df1 <- copy_to(sc, tibble(id = 1:3, value = rnorm(3)))
    df2 <- copy_to(sc, tibble(id = 1:3, value = rnorm(3)))
    
    df1 %>% 
      inner_join(df2, by = c("id"), suffix=c("_x", "_y")) %>% 
      filter(value_y > -0.836)
    
    # # Source:   lazy query [?? x 3]
    # # Database: spark_connection
    #      id value_x value_y
    #   <dbl>   <dbl>   <dbl>
    # 1    1.  -0.626   1.60 
    # 2    2.   0.184   0.330
    # 3    3.  -0.836  -0.820
    

    The problem:

    Spark allows deeply nested structure and complex types. struct are accessed using dot syntax (remember window access?), with full path over the fields. That's why, names like number.x are ambiguous.

    Normally we escape queries with backticks

    `number.x`
    

    but as far as I am aware it is not possible to express this with dplyr API (maybe some rlang tricks would do, but I cannot think of any right now).

    The problem is not really specific to joins. You should avoid having . in the names whatsoever. If for some reason you have, you can always drop to native Spark API and resolve the issue:

    df3 <- copy_to(sc, tibble(value.x = rnorm(42)))
    
    df3 %>% 
      spark_dataframe() %>% 
      invoke("withColumnRenamed", "`value.x`", "value_x") %>%
      sdf_register()
    
    # # Source:   table<sparklyr_tmp_61acdbbc592> [?? x 1]
    # # Database: spark_connection
    #    value_x
    #      <dbl>
    #  1 -0.0162
    #  2  0.944 
    #  3  0.821 
    #  4  0.594 
    #  5  0.919 
    #  6  0.782 
    #  7  0.0746
    #  8 -1.99  
    #  9  0.620 
    # 10 -0.0561
    # # ... with more r