Search code examples
apache-spark-sqlwindowpysparkpartition-by

Find the row value from which minimum value was extracted over window.partitionBy in PySpark


I have a PySpark dataframe like this:

+--------+-------------+--------------+-----------------------+
|material|purchase_date|mkt_prc_usd_lb|min_mkt_prc_over_1month|
+--------+-------------+--------------+-----------------------+
|  Copper|   2019-01-09|        2.6945|                 2.6838|
|  Copper|   2019-01-23|        2.6838|                 2.6838|
|    Zinc|   2019-01-23|        1.1829|                 1.1829|
|    Zinc|   2019-06-26|        1.1918|                 1.1918|
|Aluminum|   2019-01-02|        0.8363|                 0.8342|
|Aluminum|   2019-01-09|        0.8342|                 0.8342|
|Aluminum|   2019-01-23|        0.8555|                 0.8342|
|Aluminum|   2019-04-03|        0.8461|                 0.8461|
+--------+-------------+--------------+-----------------------+

The last column 'min_mkt_prc_over_1month' is calculated as the minimum 'mkt_prc_usd_lb' (3rd column) over a month for the material, i.e (-15 days, to +15days) over material, purchase_date window:

The code is:


w2 = (Window()
           .partitionBy("material")
           .orderBy(col("purchase_date").cast("timestamp").cast("long"))
           .rangeBetween(-days(15), days(15)))

Now, I want to see what is the 'purchase_date' when the amount was/will be minimum?

Expected Output: (from the first two rows)

+--------+-------------+--------------+-----------------------+------------------+
|material|purchase_date|mkt_prc_usd_lb|min_mkt_prc_over_1month|date_of_min_price |
+--------+-------------+--------------+-----------------------+------------------+
|  Copper|   2019-01-09|        2.6945|                 2.6838|        2019-01-23|
|  Copper|   2019-01-23|        2.6838|                 2.6838|        2019-01-23|
+--------+-------------+--------------+-----------------------+------------------+

Solution

  • Try this. We can create a column where ever the two prc are the same to populate it with purchase date, otherwise to put Null, then we can use First with ignoreNulls=True, on our newly created column using our window w2..

    from pyspark.sql.functions import *
    from pyspark.sql.window import Window
    
    days= lambda i: i * 86400
    w2 = (Window()
               .partitionBy("material")
               .orderBy(col("purchase_date").cast("timestamp").cast("long"))
               .rangeBetween(-days(15), days(15)))
    
    
    df.withColumn("first",\
                  expr("""IF(mkt_prc_usd_lb=min_mkt_prc_over_1month,purchase_date,null)"""))\
      .withColumn("date_of_min_price", first("first", True).over(w2)).drop("first")\
      .show()
    
    #+--------+-------------+--------------+-----------------------+-----------------+
    #|material|purchase_date|mkt_prc_usd_lb|min_mkt_prc_over_1month|date_of_min_price|
    #+--------+-------------+--------------+-----------------------+-----------------+
    #|  Copper|   2019-01-09|        2.6945|                 2.6838|       2019-01-23|
    #|  Copper|   2019-01-23|        2.6838|                 2.6838|       2019-01-23|
    #|    Zinc|   2019-01-23|        1.1829|                 1.1829|       2019-01-23|
    #|    Zinc|   2019-06-26|        1.1918|                 1.1918|       2019-06-26|
    #|Aluminum|   2019-01-02|        0.8363|                 0.8342|       2019-01-09|
    #|Aluminum|   2019-01-09|        0.8342|                 0.8342|       2019-01-09|
    #|Aluminum|   2019-01-23|        0.8555|                 0.8342|       2019-01-09|
    #|Aluminum|   2019-04-03|        0.8461|                 0.8461|       2019-04-03|
    #+--------+-------------+--------------+-----------------------+-----------------+