Search code examples
kotlinapache-flinkflink-streamingflink-batch

How to setup two different process functions for single flink job config?


I have a single flink job with 3 different inputs (optional) and the same output will be emitted for each type of input.

input1 uses KeyedProcessFunction() input2 uses ProcessWindowFuction()

Basically, the job input is a union of three inputs and single output. How do we configure the flink job so that for single job, I can use above both the process functions.

I wanted to use KeyedProcessFunction() for input1 and ProcessWindowFuction() for input2.

below code is for input2 only, If I want to use input1 i have to use .process(processFuction()) instead of .process(MyProcessWindowFunction()) in the job config, how do we configure so that I can use both the functions in single job ?

    fun setupJob(env: StreamExecutionEnvironment) {

        val testStream = env.sampleStream()
                   .keyBy { it.f0 }
                   .window(EventTimeSessionWindows.withGap(Time.seconds(10)))
                   .process(MyProcessWindowFunction())
    
            testStream.map { it.toKafkaMessage() }
                    .kafkaSink<SampleOutput>() }
}

Solution

  • A single Flink job can contain several pipelines. E.g.,

    env.fromSource(input1)
      .keyBy(...)
      .process(new MyKeyedProcessFunction())
      .sinkTo(sink1)
    
    env.fromSource(input2)
      .keyBy(...)
      .window(...)
      .process(new MyProcessWindowFunction())
      .sinkTo(sink2)
    
    env.execute()