Search code examples
apache-flink

How to manipulate 3 DataStream in on Flink job?


We have 3 java pojos,

class Foo{
 int id;
 String name;
 List<Bar1> list1;
 List<Bar2> list2;
}

class Bar1{
 int id;
 String field_x;
 String field_y;
}

class Bar2{
 int id;
 String field_a;
 String field_b;
}

And we have 3 DataStreams in our Flink job,

class Test{
 public static void main(...){
  DataStream<Foo> ds1 = ...;
  DataStream<Bar1> ds2 = ...;
  DataStream<Bar2> ds3 = ...;
 }
}

For each id, there will be only one Foo object, while Bar1 and Bar2 object could be multiple.

What we want to do is, for each Foo in ds1, find all Bar1 with the same id in ds2 and put them into list1, find all Bar2 with the same id in ds3 and put them into list2.

What is the best way to go?


Solution

  • Flink's DataStream operators support up to two input streams. There are two common ways to implement operations on three streams:

    1. with two binary operations. This is very simple in your case since Bar1 and Bar2 are not related to each other. This would look roughly as follows:
    DataStream<Foo> withList1 = ds1
      .connect(ds2).keyBy("id", "id")
      .process(
        // your processing logic
        new CoProcessFunction<Foo, Bar1, Foo>(){...});
    DataStream<Foo> withList1AndList2 = withList1
      .connect(ds3).keyBy("id", "id")
      .process(
        // your processing logic
        new CoProcessFunction<Foo, Bar2, Foo>(){...});
    
    1. by unioning all three streams into a single stream with a common data type (for example a POJO with three fields foo, bar1, and bar2 of which only one field is used and using an operator with a single input to process the unioned stream.
    // map Foo to CommonType
    DataStream<CommonType> common1 = ds1.map(new MapFunction<Foo, CommonType>(){...}); 
    // map Bar1 to CommonType
    DataStream<CommonType> common2 = ds2.map(new MapFunction<Bar1, CommonType>(){...});
    // map Bar2 to CommonType
    DataStream<CommonType> common3 = ds3.map(new MapFunction<Bar2, CommonType>(){...});
    
    DataStream<Foo> withList1AndList2 = ds1.union(ds2, ds3)
      .keyBy("id")
      .process(
        // your processing logic
        new KeyedProcessFunction<CommonType, Foo>(){...});
    

    You can also just union ds2 and ds3 and use a binary operator.

    The bigger problem might be to identify when all Bar1 and Bar2 events were received such that you can emit a result. Again, there a few options (depending on your use case).

    1. if Foo knows for how many Bar1 and Bar2 it needs to wait, the solution is obvious.
    2. if Foo does not know for how many events to wait, you can try to send a notification that signals that the last Bar1 or Bar2 was sent.
    3. you can also work with a time out if you know that all Bar1 or Bar2 should arrive within x seconds/minutes/etc.