Search code examples
pythonarrayspysparkstructtimestamp

PySpark: Develop functions by interacting with elements from array column of struct based on time and presence/absence of certain strings


I have a dataset that tracks the timestamped data of some items’ journey from the depot to the vendor. Normally there should be three statuses for each item (Tracking ID). But sometimes there might be late or no arrival scans in the distribution centre and that is what my concerns are. The schema and the df are as follows:

sample_df = [("A001","23/10/2019  11:06:46","DEPART_FROM_FACTORY"),
              ("A001","23/10/2019  11:08:35","ARRIVED_AT_DISTRIBUTION"),# arrived on the same day, good compliance
              ("A001","25/10/2019  13:36:14","VENDOR_ACCEPTED"),

              ("A002","01/10/2019  13:06:46","DEPART_FROM_FACTORY"),
              ("A002","02/10/2019  09:08:35","ARRIVED_AT_DISTRIBUTION"),#Not arrived on the same day, bad compliance
              ("A002","03/10/2019  12:36:14","VENDOR_ACCEPTED"), 

              ("A003","07/10/2019  13:06:46","DEPART_FROM_FACTORY"),
              ("A003","08/10/2019  09:08:35","VENDOR_ACCEPTED"), # no ARRIVED_AT_DISTRIBUTION, bad compliance
              ]

schema = StructType([ StructField("tracking_id", StringType(), True),
                      StructField("created_at", StringType(), True),
                      StructField("status", StringType(), True),
                     ])

sample_df = spark.createDataFrame(sample_df, schema)
sample_df = sample_df.withColumn("created_at",to_timestamp(col("created_at"),"dd/MM/yyyy  HH:mm:ss"))
+-----------+--------------------+--------------------+
|tracking_id|          created_at|              status|
+-----------+--------------------+--------------------+
|       A001|23/10/2019  11:06:46| DEPART_FROM_FACTORY|
|       A001|23/10/2019  11:08:35|ARRIVED_AT_DISTRI...|
|       A001|25/10/2019  13:36:14|     VENDOR_ACCEPTED|
|       A002|01/10/2019  13:06:46| DEPART_FROM_FACTORY|
|       A002|02/10/2019  09:08:35|ARRIVED_AT_DISTRI...|
|       A002|03/10/2019  12:36:14|     VENDOR_ACCEPTED|
|       A003|07/10/2019  13:06:46| DEPART_FROM_FACTORY|
|       A003|08/10/2019  09:08:35|     VENDOR_ACCEPTED|
+-----------+--------------------+--------------------+

root
 |-- tracking_id: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- status: timestamp (nullable = true)

After that, I tried to group by and created the grouped dataset by tracking ID, and :

agg = sample_df.groupBy("tracking_id").agg(collect_list(struct(
                col("tracking_id").alias("tracking_id"),
                col("created_at").alias("created_at"),
                col("status").alias("status")
            )).alias("tbl"))



agg.show(truncate=False) ; agg.printSchema()
root
 |-- tracking_id: string (nullable = true)
 |-- tbl: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- tracking_id: string (nullable = true)
 |    |    |-- created_at: timestamp (nullable = true)
 |    |    |-- status: string (nullable = true)

Expected Output: As a preliminary idea, I would like to create a UDF or develop some code that can look into the array of struct (“tbl”) and return a Boolean value in a new column based on the absence/presence of statuses, and whether they are created on the same day or not. The end result should be:

+-----------+-------------------+-----------------------+
|tracking_id|tbl                |Compliance             |
+-----------+-------------------+-----------------------+
|A001       |...                |True                   |
|A002       |...                |False                  |
|A003       |...                |False                  |
+-----------+-------------------+-----------------------+

Any help would be appreciated.


Solution

  • You don't have to use collect_list + higher order functions. The compliance column can be calculated like this:

    from pyspark.sql import types as T
    from pyspark.sql import functions as F
    
    sample_df = [
        ('A001', '23/10/2019  11:06:46', 'DEPART_FROM_FACTORY'),
        ('A001', '23/10/2019  11:08:35', 'ARRIVED_AT_DISTRIBUTION'),
        ('A001', '25/10/2019  13:36:14', 'VENDOR_ACCEPTED'),
    
        ('A002', '01/10/2019  13:06:46', 'DEPART_FROM_FACTORY'),
        ('A002', '02/10/2019  09:08:35', 'ARRIVED_AT_DISTRIBUTION'),
        ('A002', '03/10/2019  12:36:14', 'VENDOR_ACCEPTED'),
    
        ('A003', '07/10/2019  13:06:46', 'DEPART_FROM_FACTORY'),
        ('A003', '08/10/2019  09:08:35', 'VENDOR_ACCEPTED'),
    ]
    schema = T.StructType([
        T.StructField('tracking_id', T.StringType(), True),
        T.StructField('created_at', T.StringType(), True),
        T.StructField('status', T.StringType(), True),
    ])
    sample_df = spark.createDataFrame(sample_df, schema)
    created_at = F.to_timestamp('created_at', 'dd/MM/yyyy  HH:mm:ss')
    sample_df = sample_df.withColumn('created_at', created_at)
    
    is_depart = F.col('status') == 'DEPART_FROM_FACTORY'
    is_arrived = F.col('status') == 'ARRIVED_AT_DISTRIBUTION'
    is_accepted = F.col('status') == 'VENDOR_ACCEPTED'
    
    departed_day = F.when(is_depart, F.col('created_at'))
    arrived_day = F.when(is_arrived, F.col('created_at'))
    agg_cols = [
        F.sum(is_depart.cast(T.ByteType())).alias('n_depart'),
        F.sum(is_arrived.cast(T.ByteType())).alias('n_arrived'),
        F.sum(is_accepted.cast(T.ByteType())).alias('n_accepted'),
    
        F.min(departed_day).alias('departed_day'),
        F.min(arrived_day).alias('arrived_day')
    ]
    res = sample_df.groupBy('tracking_id').agg(*agg_cols)
    
    all_statuses = """
    n_depart > 0 AND n_arrived > 0 AND n_accepted > 0
    """
    all_statuses = F.expr(all_statuses)
    same_day = F.to_date('departed_day') == F.to_date('arrived_day')
    
    compliance = all_statuses & same_day
    res = res.withColumns({
        'compliance': compliance
    })
    res.show(3, False)
    
    # +-----------+--------+---------+----------+-------------------+-------------------+----------+
    # |tracking_id|n_depart|n_arrived|n_accepted|departed_day       |arrived_day        |compliance|
    # +-----------+--------+---------+----------+-------------------+-------------------+----------+
    # |A001       |1       |1        |1         |2019-10-23 11:06:46|2019-10-23 11:08:35|true      |
    # |A002       |1       |1        |1         |2019-10-01 13:06:46|2019-10-02 09:08:35|false     |
    # |A003       |1       |0        |1         |2019-10-07 13:06:46|null               |false     |
    # +-----------+--------+---------+----------+-------------------+-------------------+----------+