I'm currently trying to implement some algorithms in both, Apache Spark and Apache Flink. When executing the algorithms, I have to do some kind of set difference/subtraction operations.
While there is a built-in subtract
operation for Apache Spark, I couldn't find something similar in Apache Flink (1.0.3 and 1.1.0-SNAPSHOT).
So my question is, given two DataSet objects d1, d2
both containing the same type T
, what is the most efficient way to apply set difference, i.e. d1\d2
?
val d1: DataSet[T] = ...
val d2: DataSet[T] = ...
val d_diff: DataSet[T] = ???
Probably there is some way to it via coGroup
val d_diff = d1.coGroup(d2).where(0).equalTo(0) {
(l, r, out: Collector[T]) => {
val rightElements = r.toSet
for (el <- l)
if (!rightElements.contains(el)) out.collect(el)
}
}
but I'm wondering whether that's the correct way or even best-practice or does anybody know some more efficient way?
The DataSet API does not provide methods for it as it only contains the very basic set of operations. The Table API in 1.1 will have a set minus operator. You can see how it is implemented here.
leftDataSet
.coGroup(rightDataSet)
.where("*")
.equalTo("*")
.`with`(coGroupFunction)
Using this CoGroupFunction. So yes, you are on the right track.