Search code examples
performancefor-loopif-statementvectorizationpython-polars

How to loop a conditional loop more efficiently using Polars?


I am kind of reposting this question because I am still looking for a faster implementation (now with polars). I have improved the speed around 10x using polars instead of pandas. However, I need a more efficient way to run this function. The problem is that I have two nested for-loops with a condition on the second one (conditional on another df).

The set up is the same. I have a dataset with multiple ids, values, and dates (large T, large N). Additionally, there are other datasets, SB_dates with random dates (in blocks) and newBL, a boolean array matrix of the same size as SB_dates.

For each new block, I need to select 5 (n) random ids and calculate the average values on that date. If it's not a new block, I should retain the same random ids that were previously selected.

Using polars, I have have coded the function like this, with data and SB_dates as pl.DataFrames.

def get_mean_chrono_polars(data, SB_dates, newBL, n=5):
    n_rows, n_columns = SB_dates.shape
    df_sb = pl.DataFrame()

    for col in range(n_columns):
        date_column = pl.DataFrame( SB_dates[:, col])
        newBL_column = newBL[:, col]
        mean_values_col = []

        for i in range(n_rows):                        
            filter_ids=(data
                             .select(pl.col("date",'values','ids'))
                             .filter(pl.col("date") == date_column[i,:]))            
            if newBL_column[i]:
                random_ids=filter_ids.select(pl.col("ids").shuffle(seed=1)).limit(n)

            selected_ids_df = (
                filter_ids
                .select(pl.col("date", 'values', 'ids'))
                .filter(pl.col('ids').is_in(random_ids['ids']))
            )
            
            mean_values = selected_ids_df['values'].mean()
            mean_values_col.append(mean_values)
        mean_values_col=pl.Series(str(col),mean_values_col)        
        df_sb=df_sb.hstack([mean_values_col])
    return df_sb


r=get_mean_chrono_polars(data, SB_dates, newBL, n=5)

The data

import polars as pl
import random
from datetime import datetime, timedelta
import numpy as np
import pandas as pd 
     
n_rows = 1000  # You can adjust this as needed
start_date = datetime(2000, 1, 1)
end_date = datetime(2023, 12, 31)

dates = [
    start_date + timedelta(days=random.randint(0, (end_date - start_date).days))
    for _ in range(n_rows)
]

unique_ids = [random.randint(0, 100) for _ in range(n_rows)]
returns = [random.uniform(-0.01, 0.01) for _ in range(n_rows)]
data = pl.DataFrame({'date': dates, 'values': returns, 'ids': unique_ids})
data=data.with_columns(date=pl.col("date").dt.month_end())

Sorry again for the long post. Here is the link to the original post: Speeding up by removing col and row for-loops (loop with condition)


Solution

    • (See comments below, thank you to @jqurious) - melting the two static dataframes, SB_dates and newBL, gives you all the data in column1, column2..., order, which is exactly what is needed for this problem to eliminate the first loop. (Even if not, a sort could have been done to get it in this order.) Each element is uniquely identified by row_nr and the variable column (renamed to col) in this newly reshaped DataFrame.
    • A left join (which maintains the order of the left DataFrame) of this static dataframe with the main data can do all the filter_ids = ... step.
    • A group_by further readies "choose n random IDs on that day" step.
    • With maintain_order=True, we further maintain the original order from the left join, and thus each element of the final returned DataFrame will correctly correspond to each element of SB_dates and newBL.
    • A pl.when can be used in the aggregation to null out any unneeded calculations.
    • Note also we only need the values after the aggregation, the ids were just needed to pair with the corresponding values in the sample. Now we discard everything but the values.
    • Unnesting the values list with list.to_struct, then unnest is needed to avoid lists of nulls / struct of nulls - just a null per new column if newBL is False.
    • Each mean is then calculated and a strategy of forward is used on fill_nulls at this time.

    That gets you the mean_vals in a one-column dataframe, which can optionally be reshaped back into the original SB_dates shape if desired.

    import polars as pl
    import random
    from datetime import datetime, timedelta
    
    n_rows = 100000
    start_date = date(2000, 1, 1)
    end_date = date(2023, 12, 31)
    
    random.seed(1)
    
    # Generate data
    dates = [
        start_date + timedelta(days=random.randint(0, (end_date - start_date).days))
        for _ in range(n_rows)
    ]
    sb_dates = [
        [
            start_date + timedelta(days=random.randint(0, (end_date - start_date).days))
            for _ in range(20)
        ]
        for _ in range(6)
    ]
    returns = [random.uniform(-0.01, 0.01) for _ in range(n_rows)]
    ids = random.sample(range(n_rows * 1000), n_rows)
    
    # Create a Polars DataFrame
    data = pl.DataFrame({'date': dates, 'values': returns, 'ids': ids})
    
    sb_data = pl.DataFrame({f'bd{i}': sb_dates[i] for i in range(6)}).with_columns(
        (pl.all().dt.month_end())
    )
    blnew_data = pl.DataFrame(
        {
            f'bn{i}': [random.random() < 0.5 if j else True for j in range(20)]
            for i in range(6)
        }
    )
    
    
    def get_mean_chrono_polars(data, SB_dates, newBL, n=5):
        static_data = (
            sb_data.with_row_count()
            .melt('row_nr', variable_name='col', value_name='date')
            .with_columns(blnew_data.melt().select(bl='value'))
        )
    
        return (
            static_data.join(data, on='date', how='left')
            .group_by('row_nr', 'col', maintain_order=True)
            .agg(pl.when(pl.col('bl')).then(pl.col('ids', 'values').sample(n)))
            .select(
                pl.col('values').list.to_struct(
                    n_field_strategy='max_width',
                    fields=[f'val{j}' for j in range(n)],
                    upper_bound=n,
                ),
            )
            .unnest('values')
            .select(
                mean_val=(pl.sum_horizontal(pl.all()) / n)
                .fill_null(strategy='forward')
            # steps from this point are optional
                .reshape(SB_dates.shape)
                .list.to_struct(
                    n_field_strategy='max_width',
                    fields=[f'{j}' for j in range(len(SB_dates.columns))],
                    upper_bound=len(SB_dates.columns),
                )
            )
            .unnest('mean_val')
        )
    
    get_mean_chrono_polars(data, sb_data, blnew_data)