Here is the data frame - basically, there are two possible types - X
and Y
for different sources A
,B
,C
,D
...
Source | Type
-------------------
A | X
A | Y
B | X
C | Y
A | X
D | Y
... | ...
The ultimate goal is to gather percentage ratio: ratio(X in A) = count(X) / (count(X) + count(Y))
So, in our example, ratio(X in A) = 2 / (2 + 1) = 2 / 3 ~ 0.667
Very simple, but I can't get how to do this in one select using only Streaming API v2 (Structured streaming, Dataframes)?
From my perspective it's feasible only through the map
functions...
This is what I have now - scattered counts for X
and Y
types
msgDataFrame
.select(SOURCE, TYPE)
.where(msgDataFrame.col(TYPE).equalTo("X"))
.groupBy(SOURCE)
.count()
.show(); // <-- gives me count for 'X'-es
msgDataFrame
.select(SOURCE, TYPE)
.filter(msgDataFrame.col(TYPE).equalTo("Y"))
.groupBy(SOURCE)
.count()
.show(); // <-- gives me count for 'Y'-cs
This should work:
msgDataFrame
.select("SOURCE", "TYPE")
.groupBy("SOURCE").pivot("TYPE", Seq("X", "Y")).agg(count(col("TYPE")))
.withColumn("Ratio", col("X") / (col("X") + col("Y")))
.show