Search code examples
pythonapache-flinkgoogle-cloud-dataflowapache-beamflink-streaming

Is it possible to groupBy a column that is not the key and append the aggregated results to the original record in Apache Flink/Beam


Let’s say I have data as following…

Col1 Col2 Col3 Col4
A ABC 101 1
B ABC 102 1
C ABCD 101 1
D ABCD 101 1
E ABC 101 1

I would like groupBy Col2 and Col3 and sum(or any other complex operation) of Col4 independently and storing the aggregates back as following... For simplicity, I've kept col4 as 1 so that count and the frequency are the same.

Col1 Col2 Col2Agg Col3 Col3Agg Col4 Explanation
A ABC (ABC,1) 101 (101,1) 1 Seeing ABC, 101 for the first time
B ABC (ABC,2) 102 (102,1) 1 Have already seen ABC once, 102 for the first time
C ABCD (ABCD,1) 101 (101,2) 1 Have seen ABCD once and 101 twice
D ABCD (ABCD,2) 101 (101,3) 1 Have seen ABCD twice and 101 thrice
E ABC (ABC,3) 101 (101,4) 1 Have seen ABC thrice and 101 four times

I am exploring if such a logic can be implemented in apache beam. I am planning on persisting the Col2Agg and Col3Agg as states so that when I restart the system, the count isn't lost. I would also like to keep the number of records consistent with the input.


Solution

  • Yes, there is. The feature of Beam that is the best fit for this use case is "stateful DoFn" also called "stateful ParDo".

    The official documentation is at https://beam.apache.org/documentation/programming-guide/#state-and-timers and the introductory blog post is https://beam.apache.org/blog/stateful-processing/ and it uses an example just like what you are talking about.

    There is an important caveat: Beam is designed for massively parallel computation. Parallelism and ordering are exact opposites. So if you are wanting to get the elements in a particular order, you will quickly hit scalability and cost limitations. But if you simply want to attach a count of "what you've seen" to each element, stateful DoFn will work perfectly.