Search code examples
apache-flinkflink-streaming

How to connect more than 2 streams in Flink?


I've 3 keyed data streams of different types.

DataStream<A> first;
DataStream<B> second;
DataStream<C> third;

Each stream has its own processing logic defined and share a state between them. I want to connect these 3 streams triggering the respective processing functions whenever data is available in any stream. Connect on two streams is possible.

first.connect(second).process(<CoProcessFunction>)

I can't use union (allows multiple data stream) as the types are different. I want to avoid creating a wrapper and convert all the streams into the same type.


Solution

  • The wrapper approach isn't too bad, really. You can create an EitherOfThree<T1, T2, T3> wrapper class that's similar to Flink's existing Either<Left, Right>, and then process a stream of those records in a single function. Something like:

        DataStream <EitherOfThree<A,B,C>> combo = first.map(r -> new EitherOfThree<A,B,C>(r, null, null))
            .union(second.map(r -> new EitherOfThree<A,B,C>(null, r, null)))
            .union(third.map(r -> new EitherOfThree<A,B,C>(null, null, r)));
        combo.process(new MyProcessFunction());
    

    Flink's Either class has a more elegant implementation, but for your use case something simple should work.