Search code examples
pythonapache-sparkgroup-bypysparkpartitioning

Spark (pySpark) groupBy misordering first element on collect_list


I have the following dataframe (df_parquet):

DataFrame[id: bigint, date: timestamp, consumption: decimal(38,18)]

I intend to get sorted lists of dates and consumptions using collect_list, just as stated in this post: collect_list by preserving order based on another variable

I am following the last approach (https://stackoverflow.com/a/49246162/11841618), which is the one i think its more efficient.

So instead of just calling repartition with the default number of partitions (200) i call it with 500, and i sort within partitions by id and date, not just by date (in order to make the groupBy more efficient, or so i hope). The thing is that once per partition (on only one id per partition, and it seems to be a random id) i get the first item of a list in the last place.

Any clue on what is going on? The rest of ids are well sorted in its arrays, so I thing there is something going on with the way groupBy or collect_list behave inside each partition.

I verified its not the first or last id on a partition the one that behaves differently by getting the partition id and checking if the same groupBy + collect_list combination fails on one of those values, so it seems it's random.

Youc can check my code if you want, its pretty simple.


    ordered_df = df_parquet.repartition(500, 
    'id').sortWithinPartitions(['id', 'date'])

    grouped_df =  ordered_df.groupby("id").agg(F.collect_list("date").alias('date'), 
    F.collect_list('consumption').alias('consumption'))

And the code use to test it (comparing the first and last value, the first should be older, but on 500 cases it is not):


    test = grouped_df.filter(F.size('date') > 
    1).select('id', (F.col('date').getItem(0) > 
    F.col('date').getItem(F.size('date') - 1)).alias('test'), 
    F.array([F.col('fecha').getItem(0), 
                      F.col('date').getItem(F.size('date') - 
    1)]).alias('see')).filter(F.col('test'))

    test.show(5, 100)

    test.count()

And the results:

+-----+----+------------------------------------------+
|   id|test|                                       see|
+-----+----+------------------------------------------+
|89727|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|76325|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|80115|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|89781|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|76411|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
+-----+----+------------------------------------------+
only showing top 5 rows

500

While its expected to be an empty dataframe, as all the arrays should be sorted for all the ids.


Solution

  • Ok, the question is still unsolved, but I found an easy workaround, just in case somebody gets stuck cause of this same issue:

    The point is to invert the first and last places of the arrays. On the date array this can be done by sorting with the array_sort function introduced in spark 2.4. To perform the reordering on the consumption array we need to use an udf.

    invert_last = F.udf(lambda vector: [vector[-1]] + vector[:-1], ArrayType(DoubleType()))
    
    test = grouped_df.withColumn('error', (F.size('date') > 1) & (F.col('date').getItem(0) >
               F.col('date').getItem(F.size('date') - 1))).withColumn('date', F.when(F.col('error'),
               F.array_sort(F.col('date'))).otherwise(F.col('date'))).withColumn('consumption',
               F.when(F.col('error'), invert_last(F.col('consumption'))).otherwise(F.col('consumption'))).drop('error')
    

    Cheers.