Search code examples
javaapache-sparkdataframespark-streaming

Find ratio within the grouped data in DataFrame


Here is the data frame - basically, there are two possible types - X and Y for different sources A,B,C,D...

Source | Type
-------------------
  A    |  X
  A    |  Y
  B    |  X
  C    |  Y
  A    |  X
  D    |  Y
 ...   | ...

The ultimate goal is to gather percentage ratio: ratio(X in A) = count(X) / (count(X) + count(Y))

So, in our example, ratio(X in A) = 2 / (2 + 1) = 2 / 3 ~ 0.667

Very simple, but I can't get how to do this in one select using only Streaming API v2 (Structured streaming, Dataframes)? From my perspective it's feasible only through the map functions...

This is what I have now - scattered counts for X and Y types

        msgDataFrame
                .select(SOURCE, TYPE)
                .where(msgDataFrame.col(TYPE).equalTo("X"))
                .groupBy(SOURCE)
                .count()
                .show(); // <-- gives me count for 'X'-es

        msgDataFrame
                .select(SOURCE, TYPE)
                .filter(msgDataFrame.col(TYPE).equalTo("Y"))
                .groupBy(SOURCE)
                .count()
                .show(); // <-- gives me count for 'Y'-cs

Solution

  • This should work:

    msgDataFrame
    .select("SOURCE", "TYPE")
    .groupBy("SOURCE").pivot("TYPE", Seq("X", "Y")).agg(count(col("TYPE")))
    .withColumn("Ratio", col("X") / (col("X") + col("Y")))
    .show