I have the following dataframe (df_parquet):
DataFrame[id: bigint, date: timestamp, consumption: decimal(38,18)]
I intend to get sorted lists of dates and consumptions using collect_list, just as stated in this post: collect_list by preserving order based on another variable
I am following the last approach (https://stackoverflow.com/a/49246162/11841618), which is the one i think its more efficient.
So instead of just calling repartition with the default number of partitions (200) i call it with 500, and i sort within partitions by id and date, not just by date (in order to make the groupBy more efficient, or so i hope). The thing is that once per partition (on only one id per partition, and it seems to be a random id) i get the first item of a list in the last place.
Any clue on what is going on? The rest of ids are well sorted in its arrays, so I thing there is something going on with the way groupBy or collect_list behave inside each partition.
I verified its not the first or last id on a partition the one that behaves differently by getting the partition id and checking if the same groupBy + collect_list combination fails on one of those values, so it seems it's random.
Youc can check my code if you want, its pretty simple.
ordered_df = df_parquet.repartition(500,
'id').sortWithinPartitions(['id', 'date'])
grouped_df = ordered_df.groupby("id").agg(F.collect_list("date").alias('date'),
F.collect_list('consumption').alias('consumption'))
And the code use to test it (comparing the first and last value, the first should be older, but on 500 cases it is not):
test = grouped_df.filter(F.size('date') >
1).select('id', (F.col('date').getItem(0) >
F.col('date').getItem(F.size('date') - 1)).alias('test'),
F.array([F.col('fecha').getItem(0),
F.col('date').getItem(F.size('date') -
1)]).alias('see')).filter(F.col('test'))
test.show(5, 100)
test.count()
And the results:
+-----+----+------------------------------------------+
| id|test| see|
+-----+----+------------------------------------------+
|89727|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|76325|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|80115|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|89781|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|76411|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
+-----+----+------------------------------------------+
only showing top 5 rows
500
While its expected to be an empty dataframe, as all the arrays should be sorted for all the ids.
Ok, the question is still unsolved, but I found an easy workaround, just in case somebody gets stuck cause of this same issue:
The point is to invert the first and last places of the arrays. On the date array this can be done by sorting with the array_sort function introduced in spark 2.4. To perform the reordering on the consumption array we need to use an udf.
invert_last = F.udf(lambda vector: [vector[-1]] + vector[:-1], ArrayType(DoubleType()))
test = grouped_df.withColumn('error', (F.size('date') > 1) & (F.col('date').getItem(0) >
F.col('date').getItem(F.size('date') - 1))).withColumn('date', F.when(F.col('error'),
F.array_sort(F.col('date'))).otherwise(F.col('date'))).withColumn('consumption',
F.when(F.col('error'), invert_last(F.col('consumption'))).otherwise(F.col('consumption'))).drop('error')
Cheers.