Search code examples
scalatemplatesgenericsapache-flink

Does anyone have an example of a generic ProcessFunction in Flink?


By "generic", I mean able to accept any type of object as input and return that same object as output.

Let's say the function's job is to serialize each element into json and write that as a side output.

class MyProcessFunction() extends ProcessFunction[? , ?] {

    def processElement(element: ?, ctx: ProcessFunction[?, ?]#Context, out: Collector[?]): Unit = ??? 

    ... 
}

Can I define this in such a way that it will be usable by different types of inputs?


Solution

  • You can do that by making Your class generic. So, You would have something like:

    class MyProcessFunction[T] extends ProcessFunction[T, T] {
      override def processElement(value: T, ctx: ProcessFunction[T, T]#Context, out: Collector[T]): Unit = ???
    }
    

    This way You will be able to pin down the type when You will create the instance of the function.