Search code examples
pythonscalanumpyapache-flinkentropy

Filter a DataSet in terms of another DataSet in Scala flink


I am trying to replicate this python code:

cond_entropy_x = np.array([entropy(x[y == v]) for v in uy])

Where x and y are vectors, and uy are the unique values on y, for example 0,1.

In flink, I have:

val uy = y.distinct.collect
val condHx = for (i ← uy)
    yield entropy(x.filterWithBcVariable(y)((_, yy) ⇒ yy == i))

However, it seems filterWithBcVariable does not take every value on y, it only takes the first one.

I've also tried:

for (i ← values) yield y.join(x).where(a ⇒ a).equalTo(_ ⇒ i)

But I ran out of memory.

How could I filter x in terms of the values on y?

Something like x.zip(y) would do it, but it is not supported.

Any ideas?


Solution

  • I came out with one solution, may be is not the best, but at least its working.

    Now, instead of passing x and y as separated DataSets, I am passing a DataSet[LabeledVector] with only a column:

    val xy = input.map(lv ⇒ LabeledVector(lv.label, DenseVector(lv.vector(0))))
    

    Then I pass xy to my function:

    def conditionalEntropy(xy: DataSet[LabeledVector]): Double = {
        // Get the label
        val y = xy map (_.label)
        // Get probs for the label
        val p = probs(y).toArray.asBreeze
        // Get unique values in label
        val values = y.distinct.collect
        // Compute Conditional Entropy
        val condH = for (i ← values)
          yield entropy(xy.filter(_.label == i))
        p.dot(seq2Breeze(condH))
      }