Search code examples
pythondataframeuser-defined-functionspython-polars

Applying Python UDF function per row in a polars dataframe throws unexpected exception 'expected tuple, got list'


I have the following polars DF in Python

df = pl.DataFrame({
    "user_movies": [[7064, 7153, 78009], [6, 7, 1042], [99, 110, 3927], [2, 11, 152081], [260, 318, 195627]],
    "user_ratings": [[5.0, 5.0, 5.0], [4.0, 2.0, 4.0], [4.0, 4.0, 3.0], [3.5, 3.0, 4.0], [1.0, 4.5, 0.5]],
    "common_movies": [[7064, 7153], [7], [110, 3927], [2], [260, 195627]]
})
print(df.head())

I want to create a new column named "common_movie_ratings" that will take from each rating list only the index of the movie rated in the common movies. For example, for the first row, I should return only the ratings for movies [7064, 7153,], for the second row the ratings for the movie [7], and so on and so forth.

For this reason, I created the following function:

def get_common_movie_ratings(row): #Each row is a tuple of arrays.
    common_movies = row[2] #the index of the tuple denotes the 3rd array, which represents the common_movies column.
    user_ratings = row[1]
    ratings_for_common_movies= [user_ratings[list(row[0]).index(movie)] for movie in common_movies]
    return ratings_for_common_movies

Finally, I apply the UDF function on the dataframe like

df["common_movie_ratings"] = df.apply(get_common_movie_ratings, return_dtype=pl.List(pl.Float64))

Every time I apply the function, on the 3rd iteration/row I receive the following error

expected tuple, got list

Output after every 3rd iteration

I have also tried a different approach for the UDF function like

def get_common_movie_ratings(row):
   common_movies = row[2]
   user_ratings = row[1]
   ratings = [user_ratings[i] for i, movie in enumerate(row[0]) if movie in common_movies]
   return ratings

But again on the 3rd iteration, I received the same error.

Update - Data input and scenario scope (here)


Solution

  • What went wrong with your approach

    Ignoring performance penalties for python UDFs, there are two things that went wrong in your approach.

    1. apply which is now map_rows in the context that you're trying to use it is expecting the output to be a tuple where each element of the tuple is an output column. Your function doesn't output a tuple. If you change the return line to return (ratings_for_common_movies,) then it outputs a tuple and will work.

    2. You can't add columns to polars dataframes with square bracket notation. The only thing that can be on the left side of the = is a df, never df['new_column']=<something>. If you're using an old version that does allow it then you shouldn't, in part, because new versions don't allow it. That means you have to do something like df.with_columns(new_column=<some_expression>)

    In the case adding a column to an existing df while using map_rows you can use hstack like:

    df=df.hstack(df.map_rows(get_common_movie_ratings)).rename({'column_0':'common_movie_ratings'})
    

    The above is really an anti-pattern as using any of the map_rows, map_elements, etc when a native approach could work will be slower and less efficient. Scroll to the bottom for a map_elements approach.

    Native solution preamble

    If we assume the lists are always 3 long then you could do this...

    # this is the length of the user_movies lists
    n_count=3
    
    df.with_columns(
        # first gather the items from user_movies based on (yet to be created) 
        # indices list 
        pl.col('user_movies').list.gather(
            # use walrus operator to create new list which is the indices where 
            # user_movies are in common_movies this works by looping through 
            # each element and checking if it's in common_movies. When it is in common_movies
            # then it stores its place in the loop n variable. The n_count is the list size
            (indices:=pl.concat_list(
                pl.when(
                    pl.col('user_movies').list.get(n).is_in(pl.col('common_movies'))
                )
                .then(pl.lit(n))
                for n in range(n_count)
            ).list.drop_nulls())
        ),
        # use the same indicies list to gather the corresponding elements from user_ratings
        pl.col('user_ratings').list.gather(indices)
    )
    

    Note that we're generating the indices list by looping through a range from 0 to the length of the list as n and when the item associated the nth position of user_movies is in common_movies then that n is put in the indices list. There is unfortunately not a .index like method in polars for list type columns so, without exploding the lists, this is the best way I can think of to create that indices list.

    Native solution answer

    Polars itself can't recursively set n_count so we need to do it manually. By using lazy evaluation this is faster than other approaches as it can compute each n_count case in parallel.

    (
        pl.concat([ # this is a list comprehension
            # From here to the "for n_count..." line is the same as the previous code
            # snippet except that, here, it's called inner_df and it's being 
            # made into a lazy frame 
            inner_df.lazy().with_columns(
            pl.col('user_movies').list.gather(
                (indices:=pl.concat_list(
                    pl.when(
                        pl.col('user_movies').list.get(n).is_in(pl.col('common_movies'))
                    )
                    .then(pl.lit(n))
                    for n in range(n_count)
                ).list.drop_nulls())
            ),
            pl.col('user_ratings').list.gather(indices)
        )
        # this is the iterating part of the list comprehension
        # it takes the original df, creates a column which is
        # a row index, then it creates a column which is the
        # length of the list, it then partitions up the df into
        # multiple dfs where each of the inner_dfs only has rows
        # where the list length is the same. By using as_dict=True
        # and .items(), it gives a convenient way to unpack the
        # n_count (length of the list) and the inner_df  
        for n_count, inner_df in (
            df
            .with_row_count('i') # original row position
            .with_columns(n_count=pl.col('user_movies').list.len())
            .partition_by('n_count', as_dict=True, include_key=False)
            .items())
        ])
        .sort('i') # sort by original row position
        .drop('i') # drop the row position column
        .collect() # run all of the queries in parallel
        )
    shape: (5, 3)
    ┌───────────────┬──────────────┬───────────────┐
    │ user_movies   ┆ user_ratings ┆ common_movies │
    │ ---           ┆ ---          ┆ ---           │
    │ list[i64]     ┆ list[f64]    ┆ list[i64]     │
    ╞═══════════════╪══════════════╪═══════════════╡
    │ [7064, 7153]  ┆ [5.0, 5.0]   ┆ [7064, 7153]  │
    │ [7]           ┆ [2.0]        ┆ [7]           │
    │ [110, 3927]   ┆ [4.0, 3.0]   ┆ [110, 3927]   │
    │ [2]           ┆ [3.5]        ┆ [2]           │
    │ [260, 195627] ┆ [1.0, 0.5]   ┆ [260, 195627] │
    └───────────────┴──────────────┴───────────────┘
    

    By converting to lazy in the first part of the concat it allows for each frame to be calculated in parallel where each frame is a subset based on the length of the list. It also allows for the indices to become a CSER which means it only calculates it once even though there are 2 references to it.

    Incidentally, for less code but more processing/time, you could simply set n_counts in the preamble section to n_count=df.select(n_count=pl.col('user_movies').list.len().max()).item() and then just run the rest of what's in that section. That approach will be much slower than this one as, for every row it iterates through elements up to the max list length which adds unnecessary checks. It also doesn't get the same parallelism. In other words, it's doing more work with fewer CPU cores working on it.

    Benchmarks

    Fake data creation

    n=10_000_000
    df = (
        pl.DataFrame({
            'user':np.random.randint(1,int(n/10),size=n),
            'user_movies':np.random.randint(1,50,n),
            'user_ratings':np.random.uniform(1,5, n),
            'keep':np.random.randint(1,100,n)
        })
        .group_by('user')
        .agg(
            pl.col('user_movies'),
            pl.col('user_ratings').round(1),
            common_movies=pl.col('user_movies').filter(pl.col('keep')>75)
            )
        .filter(pl.col('common_movies').list.len()>0)
        .drop('user')
        )
    print(df.head(10))
    shape: (5, 3)
    ┌────────────────┬───────────────────┬───────────────┐
    │ user_movies    ┆ user_ratings      ┆ common_movies │
    │ ---            ┆ ---               ┆ ---           │
    │ list[i64]      ┆ list[f64]         ┆ list[i64]     │
    ╞════════════════╪═══════════════════╪═══════════════╡
    │ [23, 35, … 22] ┆ [3.4, 1.6, … 4.0] ┆ [35]          │
    │ [30, 18, … 26] ┆ [4.9, 1.9, … 2.3] ┆ [10]          │
    │ [25, 19, … 29] ┆ [1.7, 1.7, … 1.1] ┆ [18, 40, 38]  │
    │ [31, 15, … 42] ┆ [2.9, 1.8, … 4.3] ┆ [31, 4, … 42] │
    │ [36, 16, … 49] ┆ [1.0, 2.0, … 4.2] ┆ [36]          │
    └────────────────┴───────────────────┴───────────────┘
    

    My method (16 threads): 1.92 s ± 195 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

    My method (8 threads): 2.31 s ± 175 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

    My method (4 threads): 3.14 s ± 221 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

    @jqurious: 2.73 s ± 130 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

    map_rows: 9.12 s ± 195 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

    Preamble with big n_count: 9.77 s ± 1.61 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

    My method and the map_rows each use about 1 GB of RAM but the explody one is closer to 3 GB.

    struct and map_elements

    Instead of using map_rows which is, imo, really clunky, you can instead use map_elements. It has its own clunkiness as you often need to wrap your input in a struct but you can add columns more cleanly and you don't have to rely on column position.

    For instance you can define your function and use it as follows:

    def get_common_movie_ratings(row): #Each row is a tuple of arrays.
        common_movies = row['common_movies'] #the index of the tuple denotes the 3rd array, which represents the common_movies column.
        user_ratings = row['user_ratings']
        ratings_for_common_movies= [user_ratings[list(row['user_movies']).index(movie)] for movie in common_movies]
        return ratings_for_common_movies
    df.with_columns(user_ratings=pl.struct(pl.all()).map_elements(get_common_movie_ratings))
    

    What's happening here is that map_elements can only be invoked from a single column so if your custom function needs multiple inputs you can wrap them in a struct. The struct will get turned into a dict where the keys have the name of the columns. This approach doesn't have any inherent performance benefit relative to the map_rows, it's just, imo, better syntax.

    Lastly

    As @jqurious mentioned in comments of his answer, this could almost certainly be streamlined in terms of both syntax and performance by incorporating this logic with the formation of these lists. In other words, you have step 1: ______ step 2: this question. While I can only guess at what's happening in step 1 it is very likely that combining the two steps would be a worthwhile endeavor.