I have 2 streams do interval join, streamA is left stream while streamB is right, codebase is as above:
.keyBy((a: EventA) => a.common_key)
.keyBy((b: EventB) => b.common_key)
.between(Time.seconds(0), Time.minutes(5))
.process(new ProcessJoinFunction<PojoA, PojoB, Result>() {
public void processElement(PojoA left, PojoB right, Context ctx, Collector<Result> out) throws Exception {
out.collect(Result.build(left, right));
I got a pojo result
by PojoA and PojoB after interval joining. result
contains some pojoA and pojoB dimensions and metric fields, such as below:
class result {
long userId; // it's common key
String name; //from pojoA
long number; // from pojoA
String shop; // from pojoB
long orders; // from pojoA
double price: //from pojoA
situation is one streamA maybe matched multiple streamB, so after joining I need to aggregate to sum orders
and price
for joined streaming and set back to pojo result
.for example, there're 2 joined records:
joined record 1: (123, "nameA", 455, "shop", 3, 4.2)
joined record 2: (123, "nameA", 455, "shop", 6, 4.8)
after processing, should be: (123, "nameA", 455, "shop", 9(3+6), 1(4.2+4.8)/(3+6))
how to write an aggregation function to implement it?
You can do a simple reduce function following a .keyBy
, as in:
.keyBy(r -> r.getUserId())
.reduce(new YourReduceFunction))
where YourReduceFunction
looks something like:
public class YourReduceFunction implements ReduceFunction<result> {
result reduce(result v1, result v2) {
// calculate the sum of orders, sum of prices, and average price
// Note you need a new sumOfPrices field to correctly calc average
// price.