Search code examples
pythondataframepysparkapache-spark-2.0

How to select rows that are not present in another dataframe ith pyspark 2.1.0?


Env

  • pyspark 2.1.0

Context

I have two dataframes with the following structures:

dataframe 1:

id | ... | distance

dataframe 2:

id | ... | distance | other calculated values

The second dataframe is created based on a filter of the dataframe 1. This filter selects, from dataframe 1, only the distances <= 30.0.

Note that the dataframe1 will contain the same ID on multiple lines.

Problem

I need to to select from dataframe 1 rows with an ID that do not appear in the dataframe 2.

The purpose is to select the rows for which ID there is no distance lower or equal to 30.0.

Tested solution

i have tried the leftanti join, which, according to not official doc but sources on Internet (because, hey, why would they explain it ?): select all rows from df1 that are not present in df2

distinct_id_thirty = within_thirty_km \
    .select("id") \
    .distinct()
not_within_thirty_km = data_with_straight_distance.join(
        distinct_id_thirty,
        "id",
        "leftanti")

Where:

  • within_thrity_km is a dataframe resulting of the filter filter(col("distance") <= 30.0) on data_with_straight_distance
  • data_with_straight_distance is a dataframe containing all the data.
  • distinct_id_thirty is a dataframe containing a distinct list of IDs from the dataframe within_thirty_km

Question

The above returns data where distance is bellow 30. So I assume I am doing something wrong:

  • What am I doing wrong here ?
  • Is it the good way to solve this problem ? If not, how should I proceed ?

Edit:

Here is a minimal example of what I expect:

data = [
    ("1", 15),
    ("1", 35),
    ("2", 15),
    ("2", 30),
    ("3", 35)]

data = spark.createDataFrame(data, ['id', 'distance'])

data.show()

thirty = data.filter(col("distance") <= 30)

dist_thirty = thirty.select("id").distinct()

not_in_thirty = data.join(dist_thirty, "id", "left_anti")

print("thirty")
thirty.show()

print("distinst thirty")
dist_thirty.show()

print("not_in_thirty")
not_in_thirty.show()

Output:

+---+--------+
| id|distance|
+---+--------+
|  3|      35|
+---+--------+

But I do get distance <= 30 where running on my actual data.


Solution

  • "leftanti" should be replaced by "left_anti" following the documentation on: https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.join