Search code examples
apache-flink

Apache Flink DataSet difference/subtraction operation


I'm currently trying to implement some algorithms in both, Apache Spark and Apache Flink. When executing the algorithms, I have to do some kind of set difference/subtraction operations.

While there is a built-in subtract operation for Apache Spark, I couldn't find something similar in Apache Flink (1.0.3 and 1.1.0-SNAPSHOT).

So my question is, given two DataSet objects d1, d2 both containing the same type T, what is the most efficient way to apply set difference, i.e. d1\d2?

val d1: DataSet[T] = ...
val d2: DataSet[T] = ...
val d_diff: DataSet[T] = ???

Probably there is some way to it via coGroup

val d_diff = d1.coGroup(d2).where(0).equalTo(0) {
               (l, r, out: Collector[T]) => {
                val rightElements = r.toSet
                for (el <- l)
                  if (!rightElements.contains(el)) out.collect(el)
               }
             }

but I'm wondering whether that's the correct way or even best-practice or does anybody know some more efficient way?


Solution

  • The DataSet API does not provide methods for it as it only contains the very basic set of operations. The Table API in 1.1 will have a set minus operator. You can see how it is implemented here.

      leftDataSet
        .coGroup(rightDataSet)
        .where("*")
        .equalTo("*")
        .`with`(coGroupFunction)
    

    Using this CoGroupFunction. So yes, you are on the right track.