Search code examples
apache-sparkpysparkspark-structured-streaming

PySpark (Spark v3.4.1) structured streaming how to implement cumulative aggregated data to write into spark sink?


I am working in pyspark structured streaming programing to generate some cumulative aggregation of streaming data. I have used mongodb and kafka as spark read stream data. I have tried foreachBatch and foreach in spark sink but could not meet my requirements. Also I am not sure about outputMode of spark sink which one will be the best fit mode for my requirements. I am clueless after this how to move forward. Below is my read stream dataframe which I have transformed so far from source data.

+--------------------+-------+--------------+-------------------+-------------------+-------------------+----------+----------+----------------+
|                  id|user_id|profile_viewed|         created_at|         updated_at|  settings_end_date|start_time|  end_time|total_time_saved|
+--------------------+-------+--------------+-------------------+-------------------+-------------------+----------+----------+----------------+
|6500d4bce7c328000...|   2533|           143|2023-09-12 21:14:35|2023-09-15 09:01:48|2023-09-13 01:29:18|1694533475|1694548758|           19573|
|64fe9c7a39869c000...|   2660|            41|2023-09-11 04:50:02|2023-09-15 08:41:41|2023-09-11 04:53:56|1694388002|1694388236|            1464|
|650143cd2d1886000...|   2660|             6|2023-09-13 05:08:29|2023-09-15 08:41:34|2023-09-13 05:10:05|1694561909|1694562005|             276|
|6501453f6b71b8000...|   2660|             4|2023-09-13 05:14:39|2023-09-15 08:41:05|2023-09-13 05:17:06|1694562279|1694562426|             267|
|65014196ff8372000...|   2660|            70|2023-09-13 04:59:02|2023-09-15 08:40:44|2023-09-13 05:04:39|1694561342|1694561679|            2437|
|64e80cf5cfeb0e000...|   2655|            18|2023-09-15 05:07:49|2023-09-15 05:29:49|2023-09-15 05:20:49|1694734669|1694735449|            1320|
|650d5a497266eb2ca...|   2655|             2|2023-09-15 05:10:49|2023-09-15 05:14:49|2023-09-15 05:10:49|1694734849|1694734849|              60|
|6503b61c660ef2000...|   2672|             5|2023-09-15 01:40:44|2023-09-15 01:55:50|2023-09-15 01:55:50|1694722244|1694723150|            1056|
|6503b5621c53eb000...|   2672|             9|2023-09-15 01:37:38|2023-09-15 01:39:34|2023-09-15 01:39:34|1694722058|1694722174|             386|
|6503a7d81c53eb000...|   2515|            32|2023-09-15 00:39:52|2023-09-15 00:40:45|2023-09-15 00:40:45|1694718592|1694718645|            1013|
|6500c6e6602140000...|   1996|            98|2023-09-12 20:15:34|2023-09-14 22:54:31|2023-09-12 23:12:21|1694529934|1694540541|           13547|
|64fefdc4540b67000...|   2658|            65|2023-09-11 11:45:08|2023-09-14 20:04:15|2023-09-11 16:15:35|1694412908|1694429135|           18177|
|64f8ef0aec3e46000...|   1996|            41|2023-09-06 21:28:42|2023-09-14 18:53:06|2023-09-07 00:22:47|1694015922|1694026367|           11675|
+--------------------+-------+--------------+-------------------+-------------------+-------------------+----------+----------+----------------+

Please Note: In this dataframe id and user_id columns are not unique. I mean there are duplicate ids and user ids.

Now here is what I am looking for....

First, I want to keep only those unique rows in id column which are updated lastly. In another words, if there 5 records with same id then I only want to keep that one record whose updated_at column value is the latest datetime value among all 5 records. Also it is important to consider here, when a new data comes in with same id which is already present in dataframe need to be updated with the latest one. Hope this makes sense

Second, after getting the first dataframe I want to do a cumulative aggregation group by user_id column and sum of total_time_saved column.

I am very new to pyspark. Please suggest me the best way to do it. My Spark and Python versions are v3.4.1 and v3.8 respectively. Thanks in advance


Solution

  • What you seem to need is stateful operations.

    For example flatMapGroupsWithState will store state for every group and with every new message you are able to refresh the state for the corresponding group.

    Since this is not available in python, you might need to think about switching to scala or use one of the two methods mentioned in this post

    https://stackoverflow.com/a/49825585/8726538

    hope this helps