Search code examples
dateapache-sparkwindow-functionspyspark

Closest Date looking from One Column to another in PySpark Dataframe


I have a pyspark dataframe where price of Commodity is mentioned, but there is no data for when was the Commodity bought, I just have a range of 1 year.

+---------+------------+----------------+----------------+
|Commodity| BuyingPrice|Date_Upper_limit|Date_lower_limit|
+---------+------------+----------------+----------------+
|    Apple|           5|      2020-07-04|      2019-07-03|
|   Banana|           3|      2020-07-03|      2019-07-02|
|   Banana|           4|      2019-10-02|      2018-10-01|
|    Apple|           6|      2020-01-20|      2019-01-19|
|   Banana|         3.5|      2019-08-17|      2018-08-16|
+---------+------------+----------------+----------------+

I have another pyspark dataframe where I can see the market price and date of all commodities.

+----------+----------+------------+
|      Date| Commodity|Market Price|
+----------+----------+------------+
|2020-07-01|     Apple|           3|
|2020-07-01|    Banana|           3|
|2020-07-02|     Apple|           4|
|2020-07-02|    Banana|         2.5|
|2020-07-03|     Apple|           7|
|2020-07-03|    Banana|           4|
+----------+----------+------------+

I want to see the closest date to Upper limit of date when Market Price(MP) of that commodity < or = Buying Price(BP).

Expected Output (for 2 top columns):

+---------+------------+----------------+----------------+--------------------------------+
|Commodity| BuyingPrice|Date_Upper_limit|Date_lower_limit|Closest Date to UL when MP <= BP|
+---------+------------+----------------+----------------+--------------------------------+
|    Apple|           5|      2020-07-04|      2019-07-03|                      2020-07-02|
|   Banana|           3|      2020-07-03|      2019-07-02|                      2020-07-02|
+---------+------------+----------------+----------------+--------------------------------+

Even though Apple was much lower on 2020-07-01 ($3), but since 2020-07-02 was the first date going backwards from Upper Limit (UL) of date when MP <= BP. So, I selected 2020-07-02.

How can I see backwards to fill date of probable buying?


Solution

  • Try this with conditional join and window function

    from pyspark.sql import functions as F
    from pyspark.sql.window import Window  
    
    w=Window().partitionBy("Commodity")
    
    df1\  #first dataframe shown being df1 and second being df2
       .join(df2.withColumnRenamed("Commodity","Commodity1")\
             , F.expr("""`Market Price`<=BuyingPrice and Date<Date_Upper_limit and Commodity==Commodity1"""))\
       .drop("Market Price","Commodity1")\
       .withColumn("max", F.max("Date").over(w))\
       .filter('max==Date').drop("max").withColumnRenamed("Date","Closest Date to UL when MP <= BP")\
       .show()
    
    #+---------+-----------+----------------+----------------+--------------------------------+
    #|Commodity|BuyingPrice|Date_Upper_limit|Date_lower_limit|Closest Date to UL when MP <= BP|
    #+---------+-----------+----------------+----------------+--------------------------------+
    #|   Banana|        3.0|      2020-07-03|      2019-07-02|                      2020-07-02|
    #|    Apple|        5.0|      2020-07-04|      2019-07-03|                      2020-07-02|
    #+---------+-----------+----------------+----------------+--------------------------------+