Search code examples
scalaapache-flinkflink-streaming

Too Many Arguments for reduce [Flink 1.9 in Scala]


I am trying to use Flink's Incremental Window Aggregation with ReduceFunction for a project I am doing to return a single value that is the minimum in the time window with the window boundaries.

def aggregation(run1Stream: DataStream[myClass], windowSize: Time = Time.hours(1), windowSlide: Time = Time.minutes(2)): DataStream[myClass] = {
    myStream
      .keyBy(x => x.key)
      .timeWindow(windowSize, windowSlide)
      // run a incremental reduce on window aggregation 
      .reduce( new minVal(),  new AssignWindowEndProcessFunction())
  }
class minVal extends ReduceFunction[myClass] {
  override def reduce(r1: myClass, r2: myClass) = {
    (r1: myClass, r2: myClass) => {if (r1.val > r2.val) r2 else r1}
  }
}
class AssignWindowEndProcessFunction extends ProcessWindowFunction[myClass, (myClass,Long, Long), String, TimeWindow] {

  override def process(key: String,
                       ctx: Context,
                       input: Iterable[myClass],
                       out: Collector[(myClass,Long, Long)]): Unit = {
    val minVal = input.head
    val windowStart = ctx.window.getStart
    val windowEnd = ctx.window.getEnd
    out.collect((minVal, windowStart ,windowEnd))
  }

The error I am getting is: Cannot resolve overloaded method 'reduce'

Does anyone see any major issues with my implementation?


Solution

  • in my case the issue ended up being a type mismatch in the function it was called in that one of my coworkers caught. The output of the .reduce() didn't match the expected output of the function and thus gave the overloaded method changing DataStream[myClass] => DataStream[(myClass,Long, Long)] fixed the overloaded method error.

      def aggregation(run1Stream: DataStream[myClass], windowSize: Time = Time.hours(1), windowSlide: Time = Time.minutes(2)): DataStream[(myClass,Long, Long)] = {
        myStream
          .keyBy(x => x.key)
          .timeWindow(windowSize, windowSlide)
          // run a incremental reduce on window aggregation 
          .reduce( new minVal(),  new AssignWindowEndProcessFunction())
      }
    

    in addition I had to change the reduce function to

    class minVal extends ReduceFunction[myClass] {
      override def reduce(r1: myClass, r2: myClass) = {
        if (r1.val > r2.val) r2 else r1
      }
    }
    

    because before I was returning a Function2 as Arvid Heise mentioned