Search code examples
pandasdataframepysparkgroup-by

generate output dataframe like below in pyspark:


Input dataframe is below:

+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
| 10| 20| 11| 20|
| 20| 11| 10| 99|
| 10| 11| 20|  1|
| 30| 12| 20| 99|
| 10| 11| 20| 20|
| 40| 13| 15|  3|
| 30|  8| 11| 99|

I am using join but not getting desired result, can you please generate output dataframe like below in pyspark:

+---+-------+---+-------+---+-------+---+-------+
|  A|A_Count|  B|B_Count|  C|C_Count|  D|D_Count|
+---+-------+---+-------+---+-------+---+-------+
| 10|      3|  8|      1| 10|      1|  1|      1|
| 20|      1| 11|      3| 11|      2|  3|      1|
| 30|      2| 12|      1| 15|      1| 20|      2|
| 40|      1| 13|      1| 20|      3| 99|      3|
+---+-------+---+-------+---+-------+---+-------+

Solution

  • The below code works for me.

    list_of_sdf =[]
    for coll in ab.columns:
      w0=Window.orderBy(monotonically_increasing_id())
      ab=ab.withColumn(coll,col(coll).cast(IntegerType()))
      ab1=ab.groupBy(col(coll)).agg(count(col(coll)))
      
      
      list_of_sdf.append(ab1.select([coll,f"count({coll})"]).sort([coll]).withColumn("row",row_number().over(w0)))
    def join_dfs(df1, df2, join_column="row"):
        return df1.join(df2, on=join_column, how='inner')
     
    # Perform successive joins using reduce
    if list_of_sdf:
        joined_df = reduce(join_dfs, list_of_sdf)
    
    else:
        print("The list of DataFrames is empty.")