Search code examples
scalaapache-sparkapache-spark-sqlhiveqlwindow-functions

Convert HiveQL into Spark Scala


I want to convert HiveQL query with window function into Scala Spark query... but I'm constantly receiving the same exception.

Problem context: mytable consists of category and product fields. I want to get list with top N frequent product for each category. DF below is a HiveContext object

Original query (works correctly):

SELECT category, product, freq FROM (
    SELECT category, product, COUNT(*) AS freq, 
    ROW_NUMBER() OVER (PARTITION BY category ORDER BY COUNT(*) DESC) as seqnum
    FROM mytable GROUP BY category, product) ci 
WHERE seqnum <= 10;

What I have now (partially converted, doesn't work):

val w = row_number().over(Window.partitionBy("category").orderBy(count("*").desc))
val result = df.select("category", "product").groupBy("category", "product").agg(count("*").as("freq"))
val new_res = result.withColumn("seqNum", w).where(col("seqNum") <= 10).drop("seqNum")

Constantly receiving the following exception:

Exception in thread "main" org.apache.spark.sql.AnalysisException: expression 'category' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;

What can be wrong here?


Solution

  • Your mistake is to use aggregate in the orderBy clause:

    .orderBy(count("*").desc)
    

    If written like that, expression introduces new aggregate expression. Instead you should reference existing aggregate by name:

    .orderBy("freq")
    

    So your code should look like:

    val w = row_number().over(
      Window.partitionBy("category").orderBy("freq"))
    val result = df.select("category", "product")
      .groupBy("category", "product")
      .agg(count("*").as("freq"))
    val new_res = result
      .withColumn("seqNum", w).where(col("seqNum") <= 10)
      .drop("seqNum")