Search code examples
apache-spark-sqlanti-join

why left_anti join doesn't work as expected in pyspark?


In a dataframe I'm trying to identify those rows that have a value in column C2 that does not exist in column C1 in any other row. I tryed the following code:

in_df = sqlContext.createDataFrame([[1,None,'A'],[2,1,'B'],[3,None,'C'],[4,11,'D']],['C1','C2','C3'])
in_df.show()
    +---+----+---+
    | C1|  C2| C3|
    +---+----+---+
    |  1|null|  A|
    |  2|   1|  B|
    |  3|null|  C|
    |  4|  11|  D|
    +---+----+---+
filtered = in_df.filter(in_df.C2.isNotNull())
filtered.show()
    +---+---+---+
    | C1| C2| C3|
    +---+---+---+
    |  2|  1|  B|
    |  4| 11|  D|
    +---+---+---+

Now applying a left_anti join is expected to return only the row 4, however I also get row 2:

filtered.join(in_df,(in_df.C1 == filtered.C2), 'left_anti').show()
    +---+---+---+
    | C1| C2| C3|
    +---+---+---+
    |  2|  1|  B|
    |  4| 11|  D|
    +---+---+---+

If I 'materialize' the filtered DF the result is as expected:

filtered = filtered.toDF(*filtered.columns)
filtered.join(in_df,(in_df.C1 == filtered.C2), 'left_anti').show()
    +---+---+---+
    | C1| C2| C3|
    +---+---+---+
    |  4| 11|  D|
    +---+---+---+

Why is this .toDF needed?


Solution

  • in_df.C1 is actually refering to a filtered column as shows the following code:

    in_df = sqlContext.createDataFrame([[1,None,'A'],[2,1,'B'],[3,None,'C'],[4,11,'D']],['C1','C2','C3'])
    filtered = in_df.filter(in_df.C2.isNotNull()).select("C2")
    filtered.join(in_df,(in_df.C1 == filtered.C2), 'left_anti').show()
    

    Py4JJavaError: An error occurred while calling o699.join. : org.apache.spark.sql.AnalysisException: cannot resolve 'in_df.C1' given input columns: [C2, C1, C2, C3];; 'Join LeftAnti, ('in_df.C1 = 'filtered.C2) :- Project [C2#891L] : +- Filter isnotnull(C2#891L) : +- LogicalRDD [C1#890L, C2#891L, C3#892] +- LogicalRDD [C1#900L, C2#901L, C3#902]

    So basically when joining the 2 dataframes you actually use the condition filtered.C1 == filtered.C2:

    filtered = in_df.filter(in_df.C2.isNotNull())
    filtered.join(in_df,(filtered.C1 == filtered.C2), 'left_anti').show()
    
        +---+---+---+
        | C1| C2| C3|
        +---+---+---+
        |  2|  1|  B|
        |  4| 11|  D|
        +---+---+---+
    

    You might have changed the name of the dataframe but the columns in it can still be referred calling in_df.Ci. To make sure you're referring to the right dataframe you can use aliases:

    import pyspark.sql.functions as psf
    filtered.alias("filtered").join(in_df.alias("in_df"),(psf.col("in_df.C1") == psf.col("filtered.C2")), 'left_anti').show()
    
        +---+---+---+
        | C1| C2| C3|
        +---+---+---+
        |  4| 11|  D|
        +---+---+---+
    

    The best way to deal with column name ambiguities is to avoid them from the start (renaming columns or using aliases for your data frame).