I have a dataset that tracks the timestamped data of some items’ journey from the depot to the vendor. Normally there should be three statuses for each item (Tracking ID). But sometimes there might be late or no arrival scans in the distribution centre and that is what my concerns are. The schema and the df are as follows:
sample_df = [("A001","23/10/2019 11:06:46","DEPART_FROM_FACTORY"),
("A001","23/10/2019 11:08:35","ARRIVED_AT_DISTRIBUTION"),# arrived on the same day, good compliance
("A001","25/10/2019 13:36:14","VENDOR_ACCEPTED"),
("A002","01/10/2019 13:06:46","DEPART_FROM_FACTORY"),
("A002","02/10/2019 09:08:35","ARRIVED_AT_DISTRIBUTION"),#Not arrived on the same day, bad compliance
("A002","03/10/2019 12:36:14","VENDOR_ACCEPTED"),
("A003","07/10/2019 13:06:46","DEPART_FROM_FACTORY"),
("A003","08/10/2019 09:08:35","VENDOR_ACCEPTED"), # no ARRIVED_AT_DISTRIBUTION, bad compliance
]
schema = StructType([ StructField("tracking_id", StringType(), True),
StructField("created_at", StringType(), True),
StructField("status", StringType(), True),
])
sample_df = spark.createDataFrame(sample_df, schema)
sample_df = sample_df.withColumn("created_at",to_timestamp(col("created_at"),"dd/MM/yyyy HH:mm:ss"))
+-----------+--------------------+--------------------+
|tracking_id| created_at| status|
+-----------+--------------------+--------------------+
| A001|23/10/2019 11:06:46| DEPART_FROM_FACTORY|
| A001|23/10/2019 11:08:35|ARRIVED_AT_DISTRI...|
| A001|25/10/2019 13:36:14| VENDOR_ACCEPTED|
| A002|01/10/2019 13:06:46| DEPART_FROM_FACTORY|
| A002|02/10/2019 09:08:35|ARRIVED_AT_DISTRI...|
| A002|03/10/2019 12:36:14| VENDOR_ACCEPTED|
| A003|07/10/2019 13:06:46| DEPART_FROM_FACTORY|
| A003|08/10/2019 09:08:35| VENDOR_ACCEPTED|
+-----------+--------------------+--------------------+
root
|-- tracking_id: string (nullable = true)
|-- created_at: string (nullable = true)
|-- status: timestamp (nullable = true)
After that, I tried to group by and created the grouped dataset by tracking ID, and :
agg = sample_df.groupBy("tracking_id").agg(collect_list(struct(
col("tracking_id").alias("tracking_id"),
col("created_at").alias("created_at"),
col("status").alias("status")
)).alias("tbl"))
agg.show(truncate=False) ; agg.printSchema()
root
|-- tracking_id: string (nullable = true)
|-- tbl: array (nullable = false)
| |-- element: struct (containsNull = false)
| | |-- tracking_id: string (nullable = true)
| | |-- created_at: timestamp (nullable = true)
| | |-- status: string (nullable = true)
Expected Output: As a preliminary idea, I would like to create a UDF or develop some code that can look into the array of struct (“tbl”) and return a Boolean value in a new column based on the absence/presence of statuses, and whether they are created on the same day or not. The end result should be:
+-----------+-------------------+-----------------------+
|tracking_id|tbl |Compliance |
+-----------+-------------------+-----------------------+
|A001 |... |True |
|A002 |... |False |
|A003 |... |False |
+-----------+-------------------+-----------------------+
Any help would be appreciated.
You don't have to use collect_list
+ higher order functions. The compliance
column can be calculated like this:
from pyspark.sql import types as T
from pyspark.sql import functions as F
sample_df = [
('A001', '23/10/2019 11:06:46', 'DEPART_FROM_FACTORY'),
('A001', '23/10/2019 11:08:35', 'ARRIVED_AT_DISTRIBUTION'),
('A001', '25/10/2019 13:36:14', 'VENDOR_ACCEPTED'),
('A002', '01/10/2019 13:06:46', 'DEPART_FROM_FACTORY'),
('A002', '02/10/2019 09:08:35', 'ARRIVED_AT_DISTRIBUTION'),
('A002', '03/10/2019 12:36:14', 'VENDOR_ACCEPTED'),
('A003', '07/10/2019 13:06:46', 'DEPART_FROM_FACTORY'),
('A003', '08/10/2019 09:08:35', 'VENDOR_ACCEPTED'),
]
schema = T.StructType([
T.StructField('tracking_id', T.StringType(), True),
T.StructField('created_at', T.StringType(), True),
T.StructField('status', T.StringType(), True),
])
sample_df = spark.createDataFrame(sample_df, schema)
created_at = F.to_timestamp('created_at', 'dd/MM/yyyy HH:mm:ss')
sample_df = sample_df.withColumn('created_at', created_at)
is_depart = F.col('status') == 'DEPART_FROM_FACTORY'
is_arrived = F.col('status') == 'ARRIVED_AT_DISTRIBUTION'
is_accepted = F.col('status') == 'VENDOR_ACCEPTED'
departed_day = F.when(is_depart, F.col('created_at'))
arrived_day = F.when(is_arrived, F.col('created_at'))
agg_cols = [
F.sum(is_depart.cast(T.ByteType())).alias('n_depart'),
F.sum(is_arrived.cast(T.ByteType())).alias('n_arrived'),
F.sum(is_accepted.cast(T.ByteType())).alias('n_accepted'),
F.min(departed_day).alias('departed_day'),
F.min(arrived_day).alias('arrived_day')
]
res = sample_df.groupBy('tracking_id').agg(*agg_cols)
all_statuses = """
n_depart > 0 AND n_arrived > 0 AND n_accepted > 0
"""
all_statuses = F.expr(all_statuses)
same_day = F.to_date('departed_day') == F.to_date('arrived_day')
compliance = all_statuses & same_day
res = res.withColumns({
'compliance': compliance
})
res.show(3, False)
# +-----------+--------+---------+----------+-------------------+-------------------+----------+
# |tracking_id|n_depart|n_arrived|n_accepted|departed_day |arrived_day |compliance|
# +-----------+--------+---------+----------+-------------------+-------------------+----------+
# |A001 |1 |1 |1 |2019-10-23 11:06:46|2019-10-23 11:08:35|true |
# |A002 |1 |1 |1 |2019-10-01 13:06:46|2019-10-02 09:08:35|false |
# |A003 |1 |0 |1 |2019-10-07 13:06:46|null |false |
# +-----------+--------+---------+----------+-------------------+-------------------+----------+