Search code examples
scalaapache-sparkcassandraspark-cassandra-connector

Spark Cannot evaluate expression: lag of a window expression


I'm trying to perform a large number of operations on a dataframe from a cassandra table and then save it in another table. One of these operations is as follows :

val leadWindow = Window.partitionBy(col("id")).orderBy(col("timestamp").asc).rowsBetween(Window.currentRow, 2)
df.withColumn("lead1", lag(sum(col("temp1")).over(leadWindow), 2, 0))

When I'm running my job, I get an exception saying that the lag operation cannot be evaluated..

2018-10-08 12:02:22 INFO  Cluster:1543 - New Cassandra host /127.0.0.1:9042 added
    2018-10-08 12:02:22 INFO  CassandraConnector:35 - Connected to Cassandra cluster: Test Cluster
    2018-10-08 12:02:23 INFO  CassandraSourceRelation:35 - Input Predicates: [IsNotNull(ts)]
    2018-10-08 12:02:23 INFO  CassandraSourceRelation:35 - Input Predicates: [IsNotNull(ts)]
    Exception in thread "main" java.lang.UnsupportedOperationException: Cannot evaluate expression: lag(input[43, bigint, true], 2, 0)
            at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:258)
            at org.apache.spark.sql.catalyst.expressions.OffsetWindowFunction.doGenCode(windowExpressions.scala:326)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
            at scala.Option.getOrElse(Option.scala:121)
            at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
            at org.apache.spark.sql.catalyst.expressions.BinaryExpression.nullSafeCodeGen(Expression.scala:496)
            at org.apache.spark.sql.catalyst.expressions.BinaryExpression.defineCodeGen(Expression.scala:479)
            at org.apache.spark.sql.catalyst.expressions.Add.doGenCode(arithmetic.scala:174)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
            at scala.Option.getOrElse(Option.scala:121)
            at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
            at org.apache.spark.sql.catalyst.expressions.BinaryExpression.nullSafeCodeGen(Expression.scala:496)
            at org.apache.spark.sql.catalyst.expressions.BinaryExpression.defineCodeGen(Expression.scala:479)
            at org.apache.spark.sql.catalyst.expressions.BinaryComparison.doGenCode(predicates.scala:513)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
            at scala.Option.getOrElse(Option.scala:121)
            at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
            at org.apache.spark.sql.catalyst.expressions.And.doGenCode(predicates.scala:397)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
            at scala.Option.getOrElse(Option.scala:121)
            at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
            at org.apache.spark.sql.catalyst.expressions.CaseWhen$$anonfun$8.apply(conditionalExpressions.scala:202)
            at org.apache.spark.sql.catalyst.expressions.CaseWhen$$anonfun$8.apply(conditionalExpressions.scala:201)
            at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
            at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
            at scala.collection.immutable.List.foreach(List.scala:381)
            at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
            at scala.collection.immutable.List.map(List.scala:285)
            at org.apache.spark.sql.catalyst.expressions.CaseWhen.doGenCode(conditionalExpressions.scala:201)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
            at scala.Option.getOrElse(Option.scala:121)
            at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
            at org.apache.spark.sql.catalyst.expressions.Alias.genCode(namedExpressions.scala:142)
            at org.apache.spark.sql.execution.ProjectExec$$anonfun$6.apply(basicPhysicalOperators.scala:60)
            at org.apache.spark.sql.execution.ProjectExec$$anonfun$6.apply(basicPhysicalOperators.scala:60)
            at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
            at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
            at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
            at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
            at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
            at scala.collection.AbstractTraversable.map(Traversable.scala:104)
            at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:60)
            at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:181)
            at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:354)
            at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:383)
            at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
            at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
            at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
            at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
            at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:354)
            at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
            at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
            at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
            at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
            at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
            at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
            at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:524)
            at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:576)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
            at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
            at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
            at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:89)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
            at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
            at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
            at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
            at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
            at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2975)
            at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2973)
            at org.apache.spark.sql.cassandra.CassandraSourceRelation.insert(CassandraSourceRelation.scala:76)
            at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:86)
            at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
            at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
            at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
            at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
            at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
            at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
            at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
            at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
            at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
            at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
            at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
            at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
            at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
            at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
            at com.test.functions.package$ChecksFunctions.appendToTable(package.scala:66)
            at com.test.TestFromCassandra$.main(TestFromCassandra.scala:66)
            at com.test.TestFromCassandra.main(TestFromCassandra.scala)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
            at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
            at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
            at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
            at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
            at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    2018-10-08 12:02:31 INFO  CassandraConnector:35 - Disconnected from Cassandra cluster: Test Cluster

Line number 130 of the TestFromCassandra file is the call of the save() function. I didn't find any similar issue on Stackoverflow..

Do someone know why I encountering this exception ? Do the lag function have any limitations with the rolling sum function ?

EDIT : I found a similar issue on Spark's Jira. There seems to be a bug on the filter function after a reference to a window function, and since the cassandra connector is filtering dataframes on members of the primary key (using isnotnull function) before saving it, this is what might cause the exception. Is there a way to perform this operation by avoiding this bug, but without using an aggregation function ? Or do someone know how to fix this bug ?

EDIT 2 : I also tried to store my dataframe using a foreach writer and the connector withSessionDo function, but I still get the same exception.. No one has ever encountered this issue ?

EDIT 3 : I found another way to achieve the operation I wanted :

val leadWindow = Window.partitionBy(col("id")).orderBy(col("timestamp").desc).rowsBetween(Window.currentRow, 2)
df.withColumn("lead1", sum(col("temp1")).over(leadWindow))

The problem wasn't due to a filter. It seems that it's just not possible to use the lag function on a window expression.


Solution

  • I ran into the same problem and then I noticed you are using the over function inside lag (same as me). I changed to something like this:

    df.withColumn("lag1", lag(sum(col("temp1")), 2, 0).over(lagWindow))