I am working in pyspark structured streaming programing to generate some cumulative aggregation of streaming data. I have used mongodb and kafka as spark read stream data. I have tried foreachBatch
and foreach
in spark sink but could not meet my requirements. Also I am not sure about outputMode
of spark sink which one will be the best fit mode for my requirements. I am clueless after this how to move forward. Below is my read stream dataframe which I have transformed so far from source data.
+--------------------+-------+--------------+-------------------+-------------------+-------------------+----------+----------+----------------+
| id|user_id|profile_viewed| created_at| updated_at| settings_end_date|start_time| end_time|total_time_saved|
+--------------------+-------+--------------+-------------------+-------------------+-------------------+----------+----------+----------------+
|6500d4bce7c328000...| 2533| 143|2023-09-12 21:14:35|2023-09-15 09:01:48|2023-09-13 01:29:18|1694533475|1694548758| 19573|
|64fe9c7a39869c000...| 2660| 41|2023-09-11 04:50:02|2023-09-15 08:41:41|2023-09-11 04:53:56|1694388002|1694388236| 1464|
|650143cd2d1886000...| 2660| 6|2023-09-13 05:08:29|2023-09-15 08:41:34|2023-09-13 05:10:05|1694561909|1694562005| 276|
|6501453f6b71b8000...| 2660| 4|2023-09-13 05:14:39|2023-09-15 08:41:05|2023-09-13 05:17:06|1694562279|1694562426| 267|
|65014196ff8372000...| 2660| 70|2023-09-13 04:59:02|2023-09-15 08:40:44|2023-09-13 05:04:39|1694561342|1694561679| 2437|
|64e80cf5cfeb0e000...| 2655| 18|2023-09-15 05:07:49|2023-09-15 05:29:49|2023-09-15 05:20:49|1694734669|1694735449| 1320|
|650d5a497266eb2ca...| 2655| 2|2023-09-15 05:10:49|2023-09-15 05:14:49|2023-09-15 05:10:49|1694734849|1694734849| 60|
|6503b61c660ef2000...| 2672| 5|2023-09-15 01:40:44|2023-09-15 01:55:50|2023-09-15 01:55:50|1694722244|1694723150| 1056|
|6503b5621c53eb000...| 2672| 9|2023-09-15 01:37:38|2023-09-15 01:39:34|2023-09-15 01:39:34|1694722058|1694722174| 386|
|6503a7d81c53eb000...| 2515| 32|2023-09-15 00:39:52|2023-09-15 00:40:45|2023-09-15 00:40:45|1694718592|1694718645| 1013|
|6500c6e6602140000...| 1996| 98|2023-09-12 20:15:34|2023-09-14 22:54:31|2023-09-12 23:12:21|1694529934|1694540541| 13547|
|64fefdc4540b67000...| 2658| 65|2023-09-11 11:45:08|2023-09-14 20:04:15|2023-09-11 16:15:35|1694412908|1694429135| 18177|
|64f8ef0aec3e46000...| 1996| 41|2023-09-06 21:28:42|2023-09-14 18:53:06|2023-09-07 00:22:47|1694015922|1694026367| 11675|
+--------------------+-------+--------------+-------------------+-------------------+-------------------+----------+----------+----------------+
Please Note: In this dataframe id
and user_id
columns are not unique. I mean there are duplicate ids and user ids.
Now here is what I am looking for....
First, I want to keep only those unique rows in id
column which are updated lastly. In another words, if there 5 records with same id then I only want to keep that one record whose updated_at
column value is the latest datetime value among all 5 records. Also it is important to consider here, when a new data comes in with same id which is already present in dataframe need to be updated with the latest one. Hope this makes sense
Second, after getting the first dataframe I want to do a cumulative aggregation group by user_id
column and sum of total_time_saved
column.
I am very new to pyspark. Please suggest me the best way to do it. My Spark and Python versions are v3.4.1 and v3.8 respectively. Thanks in advance
What you seem to need is stateful operations.
For example flatMapGroupsWithState will store state for every group and with every new message you are able to refresh the state for the corresponding group.
Since this is not available in python, you might need to think about switching to scala or use one of the two methods mentioned in this post
https://stackoverflow.com/a/49825585/8726538
hope this helps