I am trying to replicate this python code:
cond_entropy_x = np.array([entropy(x[y == v]) for v in uy])
Where x
and y
are vectors, and uy
are the unique values on y
, for example 0,1
.
In flink, I have:
val uy = y.distinct.collect
val condHx = for (i ← uy)
yield entropy(x.filterWithBcVariable(y)((_, yy) ⇒ yy == i))
However, it seems filterWithBcVariable
does not take every value on y
, it only takes the first one.
I've also tried:
for (i ← values) yield y.join(x).where(a ⇒ a).equalTo(_ ⇒ i)
But I ran out of memory.
How could I filter x
in terms of the values on y
?
Something like x.zip(y)
would do it, but it is not supported.
Any ideas?
I came out with one solution, may be is not the best, but at least its working.
Now, instead of passing x
and y
as separated DataSets
, I am passing a DataSet[LabeledVector]
with only a column:
val xy = input.map(lv ⇒ LabeledVector(lv.label, DenseVector(lv.vector(0))))
Then I pass xy
to my function:
def conditionalEntropy(xy: DataSet[LabeledVector]): Double = {
// Get the label
val y = xy map (_.label)
// Get probs for the label
val p = probs(y).toArray.asBreeze
// Get unique values in label
val values = y.distinct.collect
// Compute Conditional Entropy
val condH = for (i ← values)
yield entropy(xy.filter(_.label == i))
p.dot(seq2Breeze(condH))
}