Is it possible to add a progress bar to a Polars apply loop with a custom function?
For example, how would I add a progress bar to the following toy example:
df = pl.DataFrame(
{
"team": ["A", "A", "A", "B", "B", "C"],
"conference": ["East", "East", "East", "West", "West", "East"],
"points": [11, 8, 10, 6, 6, 5],
"rebounds": [7, 7, 6, 9, 12, 8]
}
)
df.group_by("team").map_groups(lambda x: x.select(pl.col("points").mean()))
Edit 1:
After help from @Jcurious, I have the following 'tools' that can be re-used for other functions, however it does not print to console correctly.
def pl_progress_applier(func, task_id, progress, **kwargs):
progress.update(task_id, advance=1, refresh=True)
return func(**kwargs)
def pl_groupby_progress_apply(data, group_by, func, drop_cols=[], **kwargs):
global progress
with Progress() as progress:
num_groups = len(data.select(group_by).unique())
task_id = progress.add_task('Applying', total=num_groups)
return (
data
.group_by(group_by)
.map_groups(lambda x: pl_progress_applier(
x=x.drop(drop_cols), func=func, task_id=task_id, progress=progress, **kwargs)
)
)
# and using the function custom_func, we can return a table, howevef the progress bar jumps to 100%
def custom_func(x):
return x.select(pl.col('points').mean())
pl_groupby_progress_apply(
data=df,
group_by='team',
func=custom_func
)
Any ideas on how to get the progress bar to actually work?
Edit 2:
It seems like the above functions do indeed work, however if you're using PyCharm (like me), then it does not work. Enjoy non-PyCharm users!
I like the progress bars from Rich (which also comes bundled with pip)
There's probably a neater way to package this up, but something like:
from pip._vendor.rich.progress import (
Progress, SpinnerColumn, TimeElapsedColumn
)
def polars_bar(total, title="Processing", transient=True):
bar = Progress(
SpinnerColumn(),
*Progress.get_default_columns(),
TimeElapsedColumn(),
transient=transient # remove bar when finished
)
def _run(func, *args, **kwargs):
task_id = bar.add_task(title, total=total)
def _execute(*args, **kwargs):
bar.update(task_id, advance=1)
return func(*args, **kwargs)
return lambda self: _execute(self, *args, **kwargs)
bar.run = _run
return bar
.map_groups()
def my_custom_group_udf(group, expr):
time.sleep(.7)
return group.select(expr)
num_groups = df["team"].n_unique()
with polars_bar(total=num_groups) as bar:
(df.group_by("team")
.map_groups(
bar.run(
my_custom_group_udf,
expr=pl.col("points").mean().name.suffix("_mean")
)
)
)
.map_elements()
def my_custom_udf(points, multiplier=1):
time.sleep(.3) # simulate some work
return (points + 100) * multiplier
with polars_bar(total=df.height) as bar:
df.with_columns(
pl.col("points").map_elements(
bar.run(my_custom_udf, multiplier=5),
return_dtype = pl.Int64
)
.alias("udf")
)
Note: tqdm also has Rich support: https://tqdm.github.io/docs/rich/