Search code examples
apache-flinkflink-streaming

after interval join do aggregate


I have 2 streams do interval join, streamA is left stream while streamB is right, codebase is as above:

streamA
  .keyBy((a: EventA) => a.common_key)
  .intervalJoin(
      streamB
        .keyBy((b: EventB) => b.common_key)
    )
  .between(Time.seconds(0), Time.minutes(5))
  .process(new ProcessJoinFunction<PojoA, PojoB, Result>() {

                    @Override
                    public void processElement(PojoA left, PojoB right, Context ctx, Collector<Result> out) throws Exception {
                        out.collect(Result.build(left, right));
                    }
                })

I got a pojo result by PojoA and PojoB after interval joining. result contains some pojoA and pojoB dimensions and metric fields, such as below:

class result {
   long userId; // it's common key
   String name; //from pojoA
   long number; // from pojoA
   String shop; // from pojoB
   long orders; // from pojoA
   double price: //from pojoA
    
   
}

situation is one streamA maybe matched multiple streamB, so after joining I need to aggregate to sum orders and price for joined streaming and set back to pojo result.for example, there're 2 joined records:

joined record 1: (123, "nameA", 455, "shop", 3, 4.2)
joined record 2: (123, "nameA", 455, "shop", 6, 4.8)

after processing, should be: (123, "nameA", 455, "shop", 9(3+6), 1(4.2+4.8)/(3+6))

how to write an aggregation function to implement it?


Solution

  • You can do a simple reduce function following a .keyBy, as in:

      .keyBy(r -> r.getUserId())
      .reduce(new YourReduceFunction))
    

    where YourReduceFunction looks something like:

    public class YourReduceFunction implements ReduceFunction<result> {
    
        result reduce(result v1, result v2) {
            // calculate the sum of orders, sum of prices, and average price
            // Note you need a new sumOfPrices field to correctly calc average
            // price.
        }
    }