Search code examples
pysparkwindowvertical-scrollingprocess-mining

Vertical check to flag only specified row in PySpark


I need to do a check vertical on my dataset in PySpark to flag only row that match some condition.

In detail: I only have to flag only row where there is an "PURCHASE + SELLER" preceded by a "SALE + CUSTOMER" (bold in the example below).

Example:

Input

id order_type Initiative date
1 PURCHASE Seller 2022-02-11
1 PURCHASE Seller 2022-02-10
1 PURCHASE Seller 2022-02-09
1 SALE Customer 2022-02-08
1 SALE Customer 2022-02-07
1 SALE Customer 2022-02-06
1 PURCHASE Seller 2022-02-05
1 SALE Customer 2022-02-04
1 PURCHASE Seller 2022-02-03 (keep attention)
2 PURCHASE Customer 2022-02-11

Output

id order_type Initiative date flag difference (in days)
1 PURCHASE Seller 2022-02-11 1 3
1 PURCHASE Seller 2022-02-10 1 2
1 PURCHASE Seller 2022-02-09 1 1
1 SALE Customer 2022-02-08 0
1 SALE Customer 2022-02-07 0
1 SALE Customer 2022-02-06 0
1 PURCHASE Seller 2022-02-05 1 1
1 SALE Customer 2022-02-04 0
1 PURCHASE Seller 2022-02-03 0 (condition is not satisfied)
2 PURCHASE Customer 2022-02-11 0


Solution

  • here's my implementation

    from pyspark.sql import functions as F
    from pyspark.sql.types import *
    from pyspark.sql import Window
    
    df = spark.createDataFrame(
        [
            ("1", "PURCHASE", "Seller", "2022-02-11"),
            ("1", "PURCHASE", "Seller", "2022-02-10"),
            ("1", "PURCHASE", "Seller", "2022-02-09"),
            ("1", "SALE", "Customer", "2022-02-08"),
            ("1", "SALE", "Customer", "2022-02-07"),
            ("1", "SALE", "Customer", "2022-02-06"),
            ("1", "PURCHASE", "Seller", "2022-02-05"),
            ("1", "SALE", "Customer", "2022-02-04"),
            ("1", "PURCHASE", "Seller", "2022-02-03"),
            ("2", "PURCHASE", "Customer", "2022-02-11"),
        ],
        ["id", "order_type", "Initiative", "date"],
    )
    df = df.withColumn("date", F.col("date").cast(DateType()))
    df.show()
    
    sale_df = df.filter((F.lower(F.col("order_type")) == "sale") & (F.lower(F.col("Initiative")) == "customer"))
    sale_df.show()
    
    row_window = Window.partitionBy(
        "df.id",
        "df.order_type",
        "df.Initiative",
        "df.date",
    ).orderBy(F.col("s.date").desc())
    
    final_df = (
        df.alias("df")
        .join(
            sale_df.alias("s"),
            on=(
                (F.col("s.date") < F.col("df.date"))
                & (F.lower(F.col("df.order_type")) == "purchase")
                & (F.lower(F.col("df.Initiative")) == "seller")
            ),
            how="left",
        )
        .withColumn("row_num", F.row_number().over(row_window))
        .filter(F.col("row_num") == 1)    
        .withColumn("day_diff", F.datediff(F.col("df.date"),F.col("s.date")))       
        .withColumn(
            "flag",
            F.when(
                F.col("s.id").isNull(),
                F.lit(0),
            ).otherwise(F.lit(1)),
        )
        .select("df.*", "flag", "day_diff")
        .orderBy(F.col("df.id").asc(),F.col("df.date").desc())
    )
    final_df.show()
    

    OUTPUTS:

    +---+----------+----------+----------+
    | id|order_type|Initiative|      date|
    +---+----------+----------+----------+
    |  1|  PURCHASE|    Seller|2022-02-11|
    |  1|  PURCHASE|    Seller|2022-02-10|
    |  1|  PURCHASE|    Seller|2022-02-09|
    |  1|      SALE|  Customer|2022-02-08|
    |  1|      SALE|  Customer|2022-02-07|
    |  1|      SALE|  Customer|2022-02-06|
    |  1|  PURCHASE|    Seller|2022-02-05|
    |  1|      SALE|  Customer|2022-02-04|
    |  1|  PURCHASE|    Seller|2022-02-03|
    |  2|  PURCHASE|  Customer|2022-02-11|
    +---+----------+----------+----------+
    
    +---+----------+----------+----------+
    | id|order_type|Initiative|      date|
    +---+----------+----------+----------+
    |  1|      SALE|  Customer|2022-02-08|
    |  1|      SALE|  Customer|2022-02-07|
    |  1|      SALE|  Customer|2022-02-06|
    |  1|      SALE|  Customer|2022-02-04|
    +---+----------+----------+----------+
    

    final output:

    +---+----------+----------+----------+----+--------+
    | id|order_type|Initiative|      date|flag|day_diff|
    +---+----------+----------+----------+----+--------+
    |  1|  PURCHASE|    Seller|2022-02-11|   1|       3|
    |  1|  PURCHASE|    Seller|2022-02-10|   1|       2|
    |  1|  PURCHASE|    Seller|2022-02-09|   1|       1|
    |  1|      SALE|  Customer|2022-02-08|   0|    null|
    |  1|      SALE|  Customer|2022-02-07|   0|    null|
    |  1|      SALE|  Customer|2022-02-06|   0|    null|
    |  1|  PURCHASE|    Seller|2022-02-05|   1|       1|
    |  1|      SALE|  Customer|2022-02-04|   0|    null|
    |  1|  PURCHASE|    Seller|2022-02-03|   0|    null|
    |  2|  PURCHASE|  Customer|2022-02-11|   0|    null|
    +---+----------+----------+----------+----+--------+