Search code examples
mapreduceapache-flink

Is it possible to avoid cross transformation?


Sirs,

I’m working with Batch in Apache Flink, using the DataSet API, and I want to calculate the ’similarity’ of all elements in a DataSet.

Let the function CalculateSimilarity(e1, e2) calculate and return the similarity of elements e1 and e2.

Crossing a Dataset with itself works fine, but, I waste a lot of time and processing with not necessary calculus. I don’t really need to calculate the cartesian product of ALL elements, because, it is possible to make some improves:

i) It’s not needed to calculate the similarity of the same elements. e.g. CalculateSimilarity(A,A)
ii) CalculateSimilarity(A,B) ⇔ CalculateSimilarity(B,A). The similarities (A,B) and (B,A) are equivalents, I just need to calculate one of them.

Using flink, how could I apply a transformation where I can calculate just the necessary similarities and not all of them(cross)?

If I wasn't clear above, here is a quick example:
Dt = DataSet with 4 elements.
Dt = {e1, e2, e3 , e4}.
Wether I use cross ( Dt.cross(Dt) ), it returns all this combinations: ((e1,e1),(e1,e2),(e1,e3),(e1,e4),(e2,e1),(e2,e2),(e2,e3),(e2,e4),(e3,e1),...,(e4,e4)).
However, i just need this combinations: (e1,e2),(e1,e3),(e1,e4),(e2,e3),(e2,e4),(e3,e4).

Thanks for helping!


Solution

  • What you can do is to manually construct a join pattern which avoids permutations. You can do this by assigning each element an increasing index (0 to number of elements - 1) and then let each element only join with elements whose index is lower or equal than its own:

    val env = ExecutionEnvironment.getExecutionEnvironment
    
    val input = env.fromElements(1, 2, 3, 4, 5, 6).rebalance()
    
    // we first assign an increasing index from 0 to input.size - 1 to each element
    val indexedInput = input.zipWithIndex
    
    // here we generate the join set where we say that (idx, element) will be joined with all
    // elements whose index is at most idx
    val joinSet = indexedInput.flatMap{
      input => for (i <- 0 to input._1.toInt) yield (i.toLong, input._2)
    }
    
    // doing the join operation
    val resultSet = indexedInput.join(joinSet).where(_._1).equalTo(_._1).apply{
      (a, b) => (a._2, b._2)
    }
    

    You should try out which programs runs faster because the zipWithIndex will trigger a separate job execution.