Search code examples
pythonpython-polars

Python Polars: How to add a progress bar to map_elements / map_groups?


Is it possible to add a progress bar to a Polars apply loop with a custom function?

For example, how would I add a progress bar to the following toy example:

df = pl.DataFrame(
    {
        "team": ["A", "A", "A", "B", "B", "C"],
        "conference": ["East", "East", "East", "West", "West", "East"],
        "points": [11, 8, 10, 6, 6, 5],
        "rebounds": [7, 7, 6, 9, 12, 8]
    }
)

df.group_by("team").map_groups(lambda x: x.select(pl.col("points").mean()))

Edit 1:

After help from @Jcurious, I have the following 'tools' that can be re-used for other functions, however it does not print to console correctly.

def pl_progress_applier(func, task_id, progress, **kwargs):
    progress.update(task_id, advance=1, refresh=True)
    return func(**kwargs)

def pl_groupby_progress_apply(data, group_by, func, drop_cols=[], **kwargs):
    global progress
    with Progress() as progress:
        num_groups = len(data.select(group_by).unique())
        task_id = progress.add_task('Applying', total=num_groups)
        return (
            data
                .group_by(group_by)
                .map_groups(lambda x: pl_progress_applier(
                    x=x.drop(drop_cols), func=func, task_id=task_id, progress=progress, **kwargs)
                )
        )

# and using the function custom_func, we can return a table, howevef the progress bar jumps to 100%

def custom_func(x):
    return x.select(pl.col('points').mean())

pl_groupby_progress_apply(
    data=df,
    group_by='team',
    func=custom_func
)

Any ideas on how to get the progress bar to actually work?

Edit 2:

It seems like the above functions do indeed work, however if you're using PyCharm (like me), then it does not work. Enjoy non-PyCharm users!


Solution

  • I like the progress bars from Rich (which also comes bundled with pip)

    There's probably a neater way to package this up, but something like:

    from pip._vendor.rich.progress import (
        Progress, SpinnerColumn, TimeElapsedColumn
    )
    
    def polars_bar(total, title="Processing", transient=True):
        bar = Progress( 
            SpinnerColumn(),
            *Progress.get_default_columns(),
            TimeElapsedColumn(),
            transient=transient # remove bar when finished
        )
        
        def _run(func, *args, **kwargs):
            task_id = bar.add_task(title, total=total)
            def _execute(*args, **kwargs):
                bar.update(task_id, advance=1)
                return func(*args, **kwargs)
            return lambda self: _execute(self, *args, **kwargs)
            
        bar.run = _run
        
        return bar
    

    Examples

    .map_groups()

    def my_custom_group_udf(group, expr):
        time.sleep(.7)
        return group.select(expr)
        
    num_groups = df["team"].n_unique()
    
    with polars_bar(total=num_groups) as bar:
        (df.group_by("team")
           .map_groups(
               bar.run(
                   my_custom_group_udf, 
                   expr=pl.col("points").mean().name.suffix("_mean")
               )
           )
        )
    

    enter image description here

    .map_elements()

    def my_custom_udf(points, multiplier=1):
        time.sleep(.3) # simulate some work
        return (points + 100) * multiplier
        
    with polars_bar(total=df.height) as bar:
        df.with_columns(
            pl.col("points").map_elements(
                bar.run(my_custom_udf, multiplier=5),
                return_dtype = pl.Int64
            )
            .alias("udf")
        )
    

    enter image description here

    Note: tqdm also has Rich support: https://tqdm.github.io/docs/rich/