I have a python class with two data class, first one is a polars time series, second one a list of string.
In a dictionary, a mapping from string and function is provided, for each element of the string is associated a function that returns a polars frame (of one column).
Then there is a function class that create a polars data frame with first column the time series and the other columns are created with this function.
Columns are all independent.
Is there a way to create this data frame in parallel?
Here I try to define a minimal example:
class data_frame_constr():
function_list: List[str]
time_series: pl.DataFrame
def compute_indicator_matrix(self) -> pl.DataFrame:
for element in self.function_list:
self.time_series.with_columns(
mapping[element] # here is where we construct columns with the loop and mapping[element] is a custom function that returns a pl column
)
return self.time_series
For example, function_list = ["square", "square_root"].
Time frame is a column time series, I need to create square and square root (or other custom complex functions, identified by its name) columns, but I know the list of function only at runtime, specified in the constructor.
You can use the with_columns
context to provide a list of expressions, as long as the expressions are independent. (Note the plural: with_columns.) Polars will attempt to run all expressions in the list in parallel, even if the list of expressions is generated dynamically at run-time.
def mapping(func_str: str) -> pl.Expr:
'''Generate Expression from function string'''
...
def compute_indicator_matrix(self) -> pl.DataFrame:
expr_list = [mapping(next_funct_str)
for next_funct_str in self.function_list]
self.time_series = self.time_series.with_columns(expr_list)
return self.time_series
One note: it is a common misconception that Polars is a generic Threadpool that will run any/all code in parallel. This is not true.
If any of your expressions call external libraries or custom Python bytecode functions (e.g., using a lambda
function, map_elements
, map_batches
, etc..), then your code will be subject to the Python GIL, and will run single-threaded - no matter how you code it. Thus, try to use only the Polars expressions to achieve your objectives (rather than calling external libraries or Python functions.)
For example, try the following. (Choose a value of nbr_rows
that will stress your computing platform.) If we run the code below, it will run in parallel because everything is expressed using Polars expressions without calling external libraries or custom Python code. The result is embarassingly parallel performance.
nbr_rows = 100_000_000
df = pl.DataFrame({
'col1': pl.repeat(2, nbr_rows, eager=True),
})
df.with_columns(
pl.col('col1').pow(1.1).alias('exp_1.1'),
pl.col('col1').pow(1.2).alias('exp_1.2'),
pl.col('col1').pow(1.3).alias('exp_1.3'),
pl.col('col1').pow(1.4).alias('exp_1.4'),
pl.col('col1').pow(1.5).alias('exp_1.5'),
)
However, if we instead write the code using lambda
functions that call Python bytecode, then it will run very slowly.
import math
df.with_columns(
pl.col('col1').map_elements(lambda x: math.pow(x, 1.1)).alias('exp_1.1'),
pl.col('col1').map_elements(lambda x: math.pow(x, 1.2)).alias('exp_1.2'),
pl.col('col1').map_elements(lambda x: math.pow(x, 1.3)).alias('exp_1.3'),
pl.col('col1').map_elements(lambda x: math.pow(x, 1.4)).alias('exp_1.4'),
pl.col('col1').map_elements(lambda x: math.pow(x, 1.5)).alias('exp_1.5'),
)