I get a dataframe with schema as below:
root
|-- clip_id: string (nullable = true)
|-- frames: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- frame_id: string (nullable = true)
| | |-- data_source_info: array (nullable = true)
| | | |-- element: struct (containsNull = false)
| | | | |-- data_source_path: string (nullable = true)
| | | | |-- sub_rules: array (nullable = true)
| | | | | |-- element: string (containsNull = true)
| | | | |-- device: string (nullable = true)
| | | | |-- file_type: string (nullable = true)
| | | | |-- md5: string (nullable = true)
Here is my code, and my spark version is 3.0.2
data.select("clip_id", "frame_id", "data_source_path", "sub_rules", "device", "file_type", "md5")
.withColumn("data_source_info", struct(col("data_source_path"), col("sub_rules"),col("device"),col("file_type"), col("md5")))
.drop("data_source_path", "sub_rules", "device", "file_type", "md5")
.groupBy("clip_id", "frame_id")
.agg(collect_list("data_source_info").as("data_source_info"))
.withColumn("frames", struct(col("frame_id"),col("data_source_info")))
.sort(col("clip_id").asc,col("frame_id").asc).groupBy(col("clip_id")
.agg(collect_list("frames").asc_null_first.as("frames"))
what I want is to sort the frames by frame_id, but i got error like this:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 37.0 failed 4 times, most recent failure: Lost task 0.3 in stage 37.0 (TID 2447, 10.134.64.140, executor 39): java.lang.UnsupportedOperationException: Cannot evaluate expression: input[1, array<struct<frame_id:string,data_source_info:array<struct<data_source_path:string,sub_rules:array<string>,device:string,file_type:string,md5:string>>>>, true] ASC NULLS FIRST
at org.apache.spark.sql.catalyst.expressions.Unevaluable.eval(Expression.scala:301)
at org.apache.spark.sql.catalyst.expressions.Unevaluable.eval$(Expression.scala:300)
at org.apache.spark.sql.catalyst.expressions.SortOrder.eval(SortOrder.scala:62)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:156)
at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:76)
at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$5(AggregationIterator.scala:259)
at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.next(ObjectAggregationIterator.scala:86)
at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.next(ObjectAggregationIterator.scala:33)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:32)
at org.sparkproject.guava.collect.Ordering.leastOf(Ordering.java:658)
at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2(RDD.scala:1492)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
another way of using udf:
def frameIdSort(frames: WrappedArray[GenericRowWithSchema]): WrappedArray[GenericRowWithSchema] = frames.map(x => (x.getAs[String]("frame_id"), x)).sortBy(_._1).map(_._2)
but also got another error:
java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema is not supported
so what i can do to sort the column frames by frame_id?
The error message Cannot evaluate expression: input[1, array...
means that you cannot use asc_nulls_first
inside agg
(or select
for that matter). It is an expression describing how a dataframe should be sorted and can only be used inside an orderBy
or sort
function.
What you seem to want however is not to sort the dataframe but to sort an array column inside the dataframe. For that, you can use array_sort
and since you want to sort by frame_id
, which is the first element, you don't have to change anything in the rest of the code:
data.select("clip_id", "frame_id", "data_source_path", "sub_rules", "device", "file_type", "md5")
.withColumn("data_source_info", struct(col("data_source_path"), col("sub_rules"),col("device"),col("file_type"), col("md5")))
.drop("data_source_path", "sub_rules", "device", "file_type", "md5")
.groupBy("clip_id", "frame_id")
.agg(collect_list("data_source_info").as("data_source_info"))
.withColumn("frames", struct(col("frame_id"),col("data_source_info")))
// .sort(col("clip_id").asc,col("frame_id").asc)
.groupBy(col("clip_id")
.agg(collect_list("frames") as "frames")
.withColumn("frames", array_sort(col("frames")))
NB: I commented the sort because group by does not maintain order (See does groupBy after orderBy maintain that order?). You may put it back at the end if you like.