Search code examples
rapache-sparkdplyrsparklyr

How to LEFT ANTI JOIN in Spark 1.6 via R?


I can set up the following spark v. 1.6.0 connection and load in these two tables.

library(sparklyr)
library(dplyr)

sc <- spark_connect(master         = "local",
                    version        = "1.6.0",
                    hadoop_version = 2.4)

iris_r   <- iris %>% mutate(id = row_number())
mtcars_r <- mtcars %>% mutate(id = row_number())

iris_db   <- copy_to(sc, iris_r, name = "iris_db", overwrite = TRUE)
mtcars_db <- copy_to(sc, mtcars_r, name = "mtcars_db", overwrite = TRUE)

df <- iris_db %>% anti_join(mtcars_db, by = "id")

df

But, when I try to look at or collect df, I get the following error,

Error: org.apache.spark.sql.AnalysisException: 
Unsupported language features in query: SELECT * FROM `iris_db` AS `TBL_LEFT`

WHERE NOT EXISTS (
  SELECT 1 FROM `mtcars_db` AS `TBL_RIGHT`
  WHERE (`TBL_LEFT`.`id` = `TBL_RIGHT`.`id`)
)
TOK_QUERY 1, 0,51, 14
  TOK_FROM 1, 4,10, 14
    TOK_TABREF 1, 6,10, 14
      TOK_TABNAME 1, 6,6, 14
        iris_db 1, 6,6, 14
      TBL_LEFT 1, 10,10, 27
  TOK_INSERT 0, -1,51, 0
    TOK_DESTINATION 0, -1,-1, 0
      TOK_DIR 0, -1,-1, 0
        TOK_TMP_FILE 0, -1,-1, 0
    TOK_SELECT 0, 0,2, 0
      TOK_SELEXPR 0, 2,2, 0
        TOK_ALLCOLREF 0, 2,2, 0
    TOK_WHERE 3, 13,51, 6
      NOT 3, 15,51, 6
        TOK_SUBQUERY_EXPR 3, 17,51, 10
          TOK_SUBQUERY_OP 3, 17,17, 10
            EXISTS 3, 17,17, 10
          TOK_QUERY 4, 19,51, 16
            TOK_FROM 4, 27,33, 16
              TOK_TABREF 4, 29,33, 16
                TOK_TABNAME 4, 29,29, 16
                  mtcars_db 4, 29,29, 16
                TBL_RIGHT 4, 33,33, 31
            TOK_INSERT 0, -1,49, 0
              TOK_DESTINATION 0, -1,-1, 0
                TOK_DIR 0, -1,-1, 0
                  TOK_TMP_FILE 0, -1,-1, 0
              TOK_SELECT 4, 23,25, 9
                TOK_SELEXPR 4, 25,25, 9
                  1 4, 25,25, 9
              TOK_WHERE 5, 37,49, 25
                = 5, 39,49, 25
                  . 5, 40,42, 19
                    TOK_TABLE_OR_COL 5, 40,40, 9
                      TBL_LEFT 5, 40,40, 9
                    id 5, 42,42, 20
                  . 5, 46,48, 38
                    TOK_TABLE_OR_COL 5, 46,46, 27
                      TBL_RIGHT 5, 46,46, 27
                    id 5, 48,48, 39

scala.NotImplementedError: No parse rules for ASTNode type: 864, text: TOK_SUBQUERY_EXPR :
TOK_SUBQUERY_EXPR 3, 17,51, 10
  TOK_SUBQUERY_OP 3, 17,17, 10
    EXISTS 3, 17,17, 10
  TOK_QUERY 4, 19,51, 16
    TOK_FROM 4, 27,33, 16
      TOK_TABREF 4, 29,33, 16
        TOK_TABNAME 4, 29,29, 16
          mtcars_db 4, 29,29, 16
        TBL_RIGHT 4, 33,33, 31
    TOK_INSERT 0, -1,49, 0
      TOK_DESTINATION 0, -1,-1, 0
        TOK_DIR 0, -1,-1, 0
          TOK_TMP_FILE 0, -1,-1, 0
      TOK_SELECT 4, 23,25, 9
        TOK_SELEXPR 4, 25,25, 9
          1 4, 25,25, 9
      TOK_WHERE 5, 37,49, 25
        = 5, 39,49, 25
          . 5, 40,42, 19
            TOK_TABLE_OR_COL 5, 40,40, 9
              TBL_LEFT 5, 40,40, 9
            id 5, 42,42, 20
          . 5, 46,48, 38
            TOK_TABLE_OR_COL 5, 46,46, 27
              TBL_RIGHT 5, 46,46, 27
            id 5, 48,48, 39
" +

org.apache.spark.sql.hive.HiveQl$.nodeToExpr(HiveQl.scala:1721)
          ;
    at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:326)
    at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
    at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
    at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
    at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
    at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
    at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
    at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
    at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
    at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
    at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
    at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34)
    at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:295)
    at org.apache.spark.sql.hive.HiveQLDialect$$anonfun$parse$1.apply(HiveContext.scala:66)
    at org.apache.spark.sql.hive.HiveQLDialect$$anonfun$parse$1.apply(HiveContext.scala:66)
    at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$withHiveState$1.apply(ClientWrapper.scala:279)
    at org.apache.spark.sql.hive.client.ClientWrapper.liftedTree1$1(ClientWrapper.scala:226)
    at org.apache.spark.sql.hive.client.ClientWrapper.retryLocked(ClientWrapper.scala:225)
    at org.apache.spark.sql.hive.client.ClientWrapper.withHiveState(ClientWrapper.scala:268)
    at org.apache.spark.sql.hive.HiveQLDialect.parse(HiveContext.scala:65)
    at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211)
    at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211)
    at org.apache.spark.sql.execution.SparkSQLParser$$anonfun$org$apache$spark$sql$execution$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:114)
    at org.apache.spark.sql.execution.SparkSQLParser$$anonfun$org$apache$spark$sql$execution$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:113)
    at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
    at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
    at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
    at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
    at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
    at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
    at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
    at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
    at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34)
    at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:208)
    at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:208)
    at org.apache.spark.sql.execution.datasources.DDLParser.parse(DDLParser.scala:43)
    at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:231)
    at org.apache.spark.sql.hive.HiveContext.parseSql(HiveContext.scala:331)
    at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817)
    at sun.reflect.GeneratedMethodAccessor36.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at sparklyr.Invoke$.invoke(invoke.scala:102)
    at sparklyr.StreamHandler$.handleMethodCall(stream.scala:97)
    at sparklyr.StreamHandler$.read(stream.scala:62)
    at sparklyr.BackendHandler.channelRead0(handler.scala:52)
    at sparklyr.BackendHandler.channelRead0(handler.scala:14)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChann

This error goes away if I switch to Spark 2.0.1. Assume for this question that I'm forced to use 1.6. Is there a supported way to perform this join?


Solution

  • LEFT ANTI JOIN can be replaced with FULL OUTER JOIN followed by selection:

    df1 <- copy_to(sc, 
      data.frame(id=c(1, 2, 3), x=c("a", "b", "c")),
      name="df1", overwrite=TRUE)
    
    df2 <- copy_to(sc,
      data.frame(id=c(1, 3), y=c(2, -2)), 
      name="df2", overwrite=TRUE)
    
    df1 %>% 
      full_join(df2 %>% mutate(id_ = id), by="id") %>% 
      filter(is.null(id_)) %>% 
      select(one_of(colnames(df1)))
    

    If there are duplicate column names you'll have to correct for that as well.

    Please note that you shouldn't:

    • Use row_number() to generate global ids - it won't scale ans doesn't provide required correctness guarantees.
    • Use copy_to on Spark data frames. It collects data to the local node, hence won't work with large datasets.