Search code examples
scalaapache-sparkapache-spark-sql

spark scala cannot resolve column with using agg


I have the following code:

def proccess(spark: SparkSession, df : DataFrame): DataFrame= {
  val mydf = df.withColumn("next_intent_temp", when (col("next_intent")=== "val", 1).otherwise(0)
  
  val mynewdf = mydf.groupBy(col("a"), col("b"), col("c")
                .agg(sum(col("next_intent_temp")).as("next_intent"))
                .select(col("market"), struct(col("next_intent"),...).alias("data")
    )
  mynewdf 

}

Running it gives:

cannot resolve 'next_intent_temp' given input columns [...]; Aggregate [sum('next_intent_temp')...

I don't really understand this error because when I do mydf.show() I do see the column next_intent_temp with the correct values. so why the sum fails with this error?

Noting: I simplified the code for the question but preserved the structure that I really have.


Solution

  • You have .as("next_intent"). Use that column name.

    I think you have some syntax and missing ) aspects.

    This is a simpler example, but if you follow it, you should be ok.

    import spark.implicits._
    import org.apache.spark.sql.functions._
    
    val df = spark.sparkContext.parallelize(Seq( (1,7,"ST"), (1,8,"XX"), (1,9,"RW"), (3,10,"ST"), (3,11,"AA"), (3,12,"RW"), (2,3,"ST"), (2,4,"TT"))).toDF("i", "c", "t")
    val df2 = df.withColumn("next_intent_temp", lit(1))
    
    val df3 = df2.groupBy(col("i"), col("c"), col("t"))
                 .agg(sum(col("next_intent_temp")).as("next_intent"))
                 .select(col("i"), struct(col("next_intent")).alias("data"))
                            
    df3.show(false)