Search code examples
scalaaggregateapache-flinkfoldflink-streaming

Flink: How to convert the deprecated fold to aggregrate?


I am following the quick start example of Flink: Monitoring the Wikipedia Edit Stream.

The example is in Java, and I am implementing it in Scala, as following:

/**
 * Wikipedia Edit Monitoring
 */
object WikipediaEditMonitoring {
  def main(args: Array[String]) {
    // set up the execution environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)

    val result = edits.keyBy( _.getUser )
      .timeWindow(Time.seconds(5))
      .fold(("", 0L)) {
        (acc: (String, Long), event: WikipediaEditEvent) => {
          (event.getUser, acc._2 + event.getByteDiff)
        }
      }

    result.print

    // execute program
    env.execute("Wikipedia Edit Monitoring")
  }
}

However, the fold function in Flink is already deprecated, and the aggregate function is recommended.

enter image description here

But I did not find the example or tutorial about how to convert the deprecated fold to aggregrate.

Any idea how to do this? Probably not only by applying aggregrate.

UPDATE

I have another implementation as following:

/**
 * Wikipedia Edit Monitoring
 */
object WikipediaEditMonitoring {
  def main(args: Array[String]) {
    // set up the execution environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)

    val result = edits
      .map( e => UserWithEdits(e.getUser, e.getByteDiff) )
      .keyBy( "user" )
      .timeWindow(Time.seconds(5))
      .sum("edits")

    result.print

    // execute program
    env.execute("Wikipedia Edit Monitoring")
  }

  /** Data type for words with count */
  case class UserWithEdits(user: String, edits: Long)
}

I also would like to know how to have the implementation using self-defined AggregateFunction.

UPDATE

I followed this documentation: AggregateFunction, but have the following question:

In the source code of Interface AggregateFunction for release 1.3, you will see add indeed returns void:

void add(IN value, ACC accumulator);

But for version 1.4 AggregateFunction, is is returning:

ACC add(IN value, ACC accumulator);

How should I handle this?

The Flink version I am using is 1.3.2 and the documentation for this version is not having AggregateFunction, but there is no release 1.4 in artifactory yet.

enter image description here


Solution

  • You will find some documentation for AggregateFunction in the Flink 1.4 docs, including an example.

    The version included in 1.3.2 is limited to being used with mutable accumulator types, where the add operation modifies the accumulator. This has been fixed for Flink 1.4, but hasn't been released.