Search code examples
pythonapache-sparksortingpysparkindexing

PySpark add rank column to large dataset


I have a large dataframe and I want to compute a metric based on the rank of one of the columns. This metric really only depends on two columns from the dataframe, so I first select the two columns I care about, then compute the metric. Once the two relevant columns are selected, the dataframe looks something like this:

score     | truth
-----------------
0.7543    | 0
0.2144    | 0
0.5698    | 1
0.9221    | 1

The analytic that we want to calculate is called "average percent rank" and we want to calculate it for the ranks of data where truth == 1. So the process is to compute the percent rank for every data point, then select the rows where truth == 1, and finally compute the average percent rank of those data points. However, when we try to compute this, we get OOM errors. One of the issues is that using the pyspark.sql function rank requires using Window, and we want the window to include the entire dataframe (same fore percent_rank). Some code:

w = Window.orderBy(F.col("score"))

avg_percent_rank = (
    df
    .select("score", "truth")
    .withColumn("percent_rank", F.percent_rank().over(w))
    .filter(F.col("truth") == 1)
    .agg(F.mean(F.col("percent_rank")))
)

This results in an OOM error. There are over 6 billion records, and we need to build this for datasets that may be a hundred times larger. Ultimately, the critical operation is the sorting and indexing; we can derive percent_rank from this by dividing by the total number of rows.

Is there a better approach to computing rank than using a Window function?


Solution

  • AFAIK, there is no solution is the sparkSQL API to build a global rank or percent_rank for an entire dataframe that scales. Therefore, let's build our own. For that, we will divide the dataframe into X blocks that are going to be handled in parallel. Then we shall collect the size of each block to increment the rank of all the blocks accordingly.

    To divide the dataframe into blocks, we have lots of options. The only requirement is to respect the order of the score, that is block 0 should contain the "first" scores (say between 0 and 0.01), block 1 the next ones (say between 0.01 and 0.02) etc etc.

    We could use orderBy but in this case, since we know the lower and upper bounds of the score column, let's do something simpler:

    # Let's define the number of blocks the dataframe we be split into
    N = 1000
    
    # We assign each row to a block id
    indexed_df = df.withColumn("block", F.floor(F.col("score")*N))
    
    # we compute the size of each block
    block_sizes = indexed_df.groupBy("block").count().collect()
    
    # For each block, let's compute the amount of rows in all the blocks before 
    # this one (rank_increments) and the total number of rows (total)
    total = 0
    rank_increments = {}
    for row in block_sizes:
        rank_increments[row['block']] = total
        total += row['count']
    
    # joining that information to the original dataframe, computing the rank
    # for each block individually and then using the rank_increment to adjust
    # the rank of each element in each block
    rank_inc_df = spark.createDataFrame(
        rank_increments.items(), ['block', 'rank_increment']
    )
    win = Window.partitionBy("block").orderBy("score")
    result = indexed_df\
        .join(rank_inc_df, ['block'])\
        .withColumn("rank", F.rank().over(win) + F.col("rank_increment"))\
        .withColumn("percent_rank", (F.col("rank") -1)/(total-1))
    

    Notice that both the rank and the percent_rank are computed here, the later being computed from the former. Fell free to drop the unnecessary columns (block, rank_increment and possibly rank).

    If that does not work, try increasing N.

    NB: this approach assumes that the scores are roughly distributed uniformly. It will not share the work evenly if that's not the case but in most cases, it should work if N is big enough. In any case, it will already be better than doing everything on the same executor as before.