Search code examples
pythonpython-polars

How to know when to use map_elements, map_batches, lambda, and struct when using UDFs?


import polars as pl
import numpy as np

df_sim = pl.DataFrame({
   "daily_n": [1000, 2000, 3000, 4000],
   "prob": [.5, .5, .5, .6],
   "size": 1
   })

df_sim = df_sim.with_columns(
  pl.struct(["daily_n", "prob", "size"])
  .map_elements(lambda x: 
      np.random.binomial(n=x['daily_n'], p=x['prob'], size=x['size']))
  .cast(pl.Int32)
  .alias('events')
  )

df_sim

enter image description here

However the following code would fail with the message "TypeError: float() argument must be a string or a number, not 'Expr'"

df_sim.with_columns(
  np.random.binomial(n=col('daily_n'), p=col('prob'), size=col('size'))
  .alias('events')
  )

Why do some functions require use of struct(), map_elements() and lambda, while others do not?

In my case below I am able to simply refer to polars columns as function arguments by using col().

def local_double(x):
  return(2*x)

df_ab.with_columns(rev_2x = local_double(col("revenue")))

enter image description here


Solution

  • Let's go back to what a context is/does.

    polars DataFrames (or LazyFrame) have contexts which is just a generic way of referring to with_columns, select, agg, and group_by. The inputs to contexts are Expressions. To a limited extent, the python side of polars can convert python objects into polars expressions. For example a datetime or an int are easily converted to a polars expression and so when you input col('a')*2. It converts that into an expression of col('a').mul(lit(2)).

    Functions that return expressions:

    Here's your function with type annotations.

    def local_double(x: pl.Expr) -> pl.Expr:
      return(2*x)
    

    It takes an Expr as input and returns another Expr as output. It doesn't do any work, it just gives polars a new Expr. Using this function is the same as doing df_ab.with_columns(rev_2x = 2*col("revenue")). In fact, polars isn't doing anything with your function when you do df_ab.with_columns(rev_2x = local_double(col("revenue"))) because the order of operations by python is going to resolve your function so that python can give polars its output as an input to polars' context.

    Why do we need map_batches and map_elements

    Remember that polars contexts are expecting expressions. One of the reasons polars is so fast and efficient is that behind the API is its own query language and processing engine. That language speaks in expressions. To "translate" from python that it doesn't already know you have to use one of the map_* functions. What they do is convert your expression into values. In the case of map_batches it will give the whole pl.Series to whatever function you choose all at once. In the case of map_elements it will give the function one python value at a time. They are the translation layer so that polars can interact with arbitrary functions.

    Why do we need to wrap columns in struct?

    Polars is designed to operate multiple expressions in parallel. That means that each expression doesn't know what any other expression is doing. As a side effect of this it means that no expression can be the input of another expression in the same context. This may seem like a limiting design but it's really not because of structs. They are a type of column which can contain multiple columns in one.

    If you're going to use a function that needs multiple inputs from your DataFrame then they give the way of converting multiple columns into just one expression. If you only need one column from your DataFrame to be handed to your function then you don't need to wrap it in a struct.

    (bonus) Besides functions that return Exprs are there other times we don't need map_*?

    Yes. Numpy has what they call Universal Functions, or ufunc. You can use a ufunc directly in a context giving it your col('a'), col('b') directly as inputs. For example, you can do

    df.with_columns(log_a = np.log(pl.col('a')))
    

    and it'll just work. You can even make your own ufunc with numba which will also just work. The mechanism behind why ufuncs just work is actually the same as Functions that return expressions, but with more hidden steps. When a ufunc gets an input that isn't an np.array, instead of raising an error (as you got with np.random.binomial), it will check if the input has __array_ufunc__ as a method. If it does then it'll run that method. polars implements that method in pl.Expr so the above gets converted into

    df.with_columns(log_a = pl.col('a').map_batches(np.log))
    

    If you have a ufunc that takes multiple inputs, it will even convert all of those inputs into a struct automatically.

    Why do you need to use lambda sometimes?

    You don't ever need lambda, it's just a way to make and use a function in one line. Instead of your example we could do this instead

    def binomial_elements(x: dict) -> float:
        return np.random.binomial(n=x['daily_n'], p=x['prob'], size=x['size'])
    
    
    df_sim.with_columns(
      pl.struct(["daily_n", "prob", "size"])
      .map_elements(binomial_elements)
      .cast(pl.Int32)
      .alias('events')
      )
    

    (bonus) When to use map_elements and when to use map_batches?

    Spoiler alert: Your example should be map_batches

    Anytime you're dealing with a vectorized function, map_batches is the better choice. I believe most (if not all) of numpy's functions are vectorized, as are scipy's. As such, your example would be more performant as:

    def binomial_batches(x: pl.Series) -> np.array:
        return np.random.binomial(n=x.struct['daily_n'], p=x.struct['prob'])
    
    
    df_sim.with_columns(
      pl.struct("daily_n", "prob")
      .map_batches(binomial_batches)
      .cast(pl.Int32)
      .alias('events')
      )
    

    Notice that I took out the size parameter because numpy infers the output size from the size of daily_n and prob.

    Also, when you do map_batches on the Expr, it becomes a Series rather than a dict. To access the individual fields within the struct Series, you need to use the .struct namespace so that's a bit different syntax to be aware of between map_elements and map_batches.

    You could also do this as a lambda like

    df_sim.with_columns(
      pl.struct("daily_n", "prob")
      .map_batches(lambda x: np.random.binomial(n=x.struct['daily_n'], p=x.struct['prob']))
      .cast(pl.Int32)
      .alias('events')
      )
    

    One last overlooked thing about map_batches

    The function that you give map_batches is supposed to return a pl.Series except for in the above it returns an np.array. polars has pretty good interoperability with numpy so it's able to automatically convert the np.array into a pl.Series. One area where you might get tripped up is if you're using pyarrow.compute functions. Polars won't automatically convert that to pl.Series so you'd need to explicitly do it.

    As an aside:

    I made this gist of a decorator which will, in principle, take any function and make it look for the __array_ufunc__ method of inputs so that you don't have to use map_*. I say "in principle" because I haven't tested it extensively so don't want to over hype it.

    A note on np.random.binomial (response to comment)

    There are 2+1 modes of binomial (and really many np functions). What do I mean 2+1?

    • You can give it a single value in each of n and p and then give it a size to get a 1d array with a length of size. This is essentially what your map_elements approach is doing

    • You can give it an array for n or p and nothing for size then it'll give you a 1d array matching the size of the array you gave it for n. This is what the map_batches approach is doing.

    • (the +1) You can combine the previous two modes and give it an array for n, p, and for size you give it a tuple where the first element is the number of simulations for each n and p with the second element of the tuple being the length of n and p. With that, it'll give you a 2d array with rows equal to the number of simulations and columns for each of the input length.

    You can get that 3rd mode in polars as long as you transpose to fit polars. That would look like this:

    df_sim.with_columns(
      pl.struct("daily_n", "prob")
      .map_batches(lambda x: (
          np.random.binomial(
              n=x.struct['daily_n'], 
              p=x.struct['prob'], 
              size=(3,x.shape[0])
              ).transpose()
          )
      )
      .alias('events')
      )
    
    shape: (4, 4)
    ┌─────────┬──────┬──────┬────────────────────┐
    │ daily_n ┆ prob ┆ size ┆ events             │
    │ ---     ┆ ---  ┆ ---  ┆ ---                │
    │ i64     ┆ f64  ┆ i32  ┆ list[i64]          │
    ╞═════════╪══════╪══════╪════════════════════╡
    │ 1000    ┆ 0.5  ┆ 1    ┆ [491, 493, 482]    │
    │ 2000    ┆ 0.5  ┆ 1    ┆ [1032, 966, 972]   │
    │ 3000    ┆ 0.5  ┆ 1    ┆ [1528, 1504, 1483] │
    │ 4000    ┆ 0.6  ┆ 1    ┆ [2401, 2422, 2367] │
    └─────────┴──────┴──────┴────────────────────┘