Search code examples

Is it possible to groupBy a column that is not the key and append the aggregated results to the original record in Apache Flink/Beam

Let’s say I have data as following…

Col1 Col2 Col3 Col4
A ABC 101 1
B ABC 102 1
C ABCD 101 1
D ABCD 101 1
E ABC 101 1

I would like groupBy Col2 and Col3 and sum(or any other complex operation) of Col4 independently and storing the aggregates back as following... For simplicity, I've kept col4 as 1 so that count and the frequency are the same.

Col1 Col2 Col2Agg Col3 Col3Agg Col4 Explanation
A ABC (ABC,1) 101 (101,1) 1 Seeing ABC, 101 for the first time
B ABC (ABC,2) 102 (102,1) 1 Have already seen ABC once, 102 for the first time
C ABCD (ABCD,1) 101 (101,2) 1 Have seen ABCD once and 101 twice
D ABCD (ABCD,2) 101 (101,3) 1 Have seen ABCD twice and 101 thrice
E ABC (ABC,3) 101 (101,4) 1 Have seen ABC thrice and 101 four times

I am exploring if such a logic can be implemented in apache beam. I am planning on persisting the Col2Agg and Col3Agg as states so that when I restart the system, the count isn't lost. I would also like to keep the number of records consistent with the input.


  • Yes, there is. The feature of Beam that is the best fit for this use case is "stateful DoFn" also called "stateful ParDo".

    The official documentation is at and the introductory blog post is and it uses an example just like what you are talking about.

    There is an important caveat: Beam is designed for massively parallel computation. Parallelism and ordering are exact opposites. So if you are wanting to get the elements in a particular order, you will quickly hit scalability and cost limitations. But if you simply want to attach a count of "what you've seen" to each element, stateful DoFn will work perfectly.