I have the following polars DF in Python
df = pl.DataFrame({
"user_movies": [[7064, 7153, 78009], [6, 7, 1042], [99, 110, 3927], [2, 11, 152081], [260, 318, 195627]],
"user_ratings": [[5.0, 5.0, 5.0], [4.0, 2.0, 4.0], [4.0, 4.0, 3.0], [3.5, 3.0, 4.0], [1.0, 4.5, 0.5]],
"common_movies": [[7064, 7153], [7], [110, 3927], [2], [260, 195627]]
})
print(df.head())
I want to create a new column named "common_movie_ratings" that will take from each rating list only the index of the movie rated in the common movies. For example, for the first row, I should return only the ratings for movies [7064, 7153,], for the second row the ratings for the movie [7], and so on and so forth.
For this reason, I created the following function:
def get_common_movie_ratings(row): #Each row is a tuple of arrays.
common_movies = row[2] #the index of the tuple denotes the 3rd array, which represents the common_movies column.
user_ratings = row[1]
ratings_for_common_movies= [user_ratings[list(row[0]).index(movie)] for movie in common_movies]
return ratings_for_common_movies
Finally, I apply the UDF function on the dataframe like
df["common_movie_ratings"] = df.apply(get_common_movie_ratings, return_dtype=pl.List(pl.Float64))
Every time I apply the function, on the 3rd iteration/row I receive the following error
expected tuple, got list
I have also tried a different approach for the UDF function like
def get_common_movie_ratings(row):
common_movies = row[2]
user_ratings = row[1]
ratings = [user_ratings[i] for i, movie in enumerate(row[0]) if movie in common_movies]
return ratings
But again on the 3rd iteration, I received the same error.
Ignoring performance penalties for python UDFs, there are two things that went wrong in your approach.
apply
which is now map_rows
in the context that you're trying to use it is expecting the output to be a tuple where each element of the tuple is an output column. Your function doesn't output a tuple. If you change the return line to return (ratings_for_common_movies,)
then it outputs a tuple and will work.
You can't add columns to polars dataframes with square bracket notation. The only thing that can be on the left side of the =
is a df, never df['new_column']=<something>
. If you're using an old version that does allow it then you shouldn't, in part, because new versions don't allow it. That means you have to do something like df.with_columns(new_column=<some_expression>)
In the case adding a column to an existing df while using map_rows
you can use hstack
like:
df=df.hstack(df.map_rows(get_common_movie_ratings)).rename({'column_0':'common_movie_ratings'})
The above is really an anti-pattern as using any of the map_rows
, map_elements
, etc when a native approach could work will be slower and less efficient. Scroll to the bottom for a map_elements
approach.
If we assume the lists are always 3 long then you could do this...
# this is the length of the user_movies lists
n_count=3
df.with_columns(
# first gather the items from user_movies based on (yet to be created)
# indices list
pl.col('user_movies').list.gather(
# use walrus operator to create new list which is the indices where
# user_movies are in common_movies this works by looping through
# each element and checking if it's in common_movies. When it is in common_movies
# then it stores its place in the loop n variable. The n_count is the list size
(indices:=pl.concat_list(
pl.when(
pl.col('user_movies').list.get(n).is_in(pl.col('common_movies'))
)
.then(pl.lit(n))
for n in range(n_count)
).list.drop_nulls())
),
# use the same indicies list to gather the corresponding elements from user_ratings
pl.col('user_ratings').list.gather(indices)
)
Note that we're generating the indices list by looping through a range from 0 to the length of the list as n
and when the item associated the n
th position of user_movies
is in common_movies
then that n
is put in the indices list. There is unfortunately not a .index
like method in polars for list
type columns so, without exploding the lists, this is the best way I can think of to create that indices list.
Polars itself can't recursively set n_count
so we need to do it manually. By using lazy evaluation this is faster than other approaches as it can compute each n_count
case in parallel.
(
pl.concat([ # this is a list comprehension
# From here to the "for n_count..." line is the same as the previous code
# snippet except that, here, it's called inner_df and it's being
# made into a lazy frame
inner_df.lazy().with_columns(
pl.col('user_movies').list.gather(
(indices:=pl.concat_list(
pl.when(
pl.col('user_movies').list.get(n).is_in(pl.col('common_movies'))
)
.then(pl.lit(n))
for n in range(n_count)
).list.drop_nulls())
),
pl.col('user_ratings').list.gather(indices)
)
# this is the iterating part of the list comprehension
# it takes the original df, creates a column which is
# a row index, then it creates a column which is the
# length of the list, it then partitions up the df into
# multiple dfs where each of the inner_dfs only has rows
# where the list length is the same. By using as_dict=True
# and .items(), it gives a convenient way to unpack the
# n_count (length of the list) and the inner_df
for n_count, inner_df in (
df
.with_row_count('i') # original row position
.with_columns(n_count=pl.col('user_movies').list.len())
.partition_by('n_count', as_dict=True, include_key=False)
.items())
])
.sort('i') # sort by original row position
.drop('i') # drop the row position column
.collect() # run all of the queries in parallel
)
shape: (5, 3)
┌───────────────┬──────────────┬───────────────┐
│ user_movies ┆ user_ratings ┆ common_movies │
│ --- ┆ --- ┆ --- │
│ list[i64] ┆ list[f64] ┆ list[i64] │
╞═══════════════╪══════════════╪═══════════════╡
│ [7064, 7153] ┆ [5.0, 5.0] ┆ [7064, 7153] │
│ [7] ┆ [2.0] ┆ [7] │
│ [110, 3927] ┆ [4.0, 3.0] ┆ [110, 3927] │
│ [2] ┆ [3.5] ┆ [2] │
│ [260, 195627] ┆ [1.0, 0.5] ┆ [260, 195627] │
└───────────────┴──────────────┴───────────────┘
By converting to lazy
in the first part of the concat
it allows for each frame to be calculated in parallel where each frame is a subset based on the length of the list. It also allows for the indices
to become a CSER which means it only calculates it once even though there are 2 references to it.
Incidentally, for less code but more processing/time, you could simply set n_counts
in the preamble section to n_count=df.select(n_count=pl.col('user_movies').list.len().max()).item()
and then just run the rest of what's in that section. That approach will be much slower than this one as, for every row it iterates through elements up to the max list length which adds unnecessary checks. It also doesn't get the same parallelism. In other words, it's doing more work with fewer CPU cores working on it.
Fake data creation
n=10_000_000
df = (
pl.DataFrame({
'user':np.random.randint(1,int(n/10),size=n),
'user_movies':np.random.randint(1,50,n),
'user_ratings':np.random.uniform(1,5, n),
'keep':np.random.randint(1,100,n)
})
.group_by('user')
.agg(
pl.col('user_movies'),
pl.col('user_ratings').round(1),
common_movies=pl.col('user_movies').filter(pl.col('keep')>75)
)
.filter(pl.col('common_movies').list.len()>0)
.drop('user')
)
print(df.head(10))
shape: (5, 3)
┌────────────────┬───────────────────┬───────────────┐
│ user_movies ┆ user_ratings ┆ common_movies │
│ --- ┆ --- ┆ --- │
│ list[i64] ┆ list[f64] ┆ list[i64] │
╞════════════════╪═══════════════════╪═══════════════╡
│ [23, 35, … 22] ┆ [3.4, 1.6, … 4.0] ┆ [35] │
│ [30, 18, … 26] ┆ [4.9, 1.9, … 2.3] ┆ [10] │
│ [25, 19, … 29] ┆ [1.7, 1.7, … 1.1] ┆ [18, 40, 38] │
│ [31, 15, … 42] ┆ [2.9, 1.8, … 4.3] ┆ [31, 4, … 42] │
│ [36, 16, … 49] ┆ [1.0, 2.0, … 4.2] ┆ [36] │
└────────────────┴───────────────────┴───────────────┘
My method (16 threads): 1.92 s ± 195 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
My method (8 threads): 2.31 s ± 175 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
My method (4 threads): 3.14 s ± 221 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
@jqurious: 2.73 s ± 130 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
map_rows
: 9.12 s ± 195 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Preamble with big n_count: 9.77 s ± 1.61 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
My method and the map_rows each use about 1 GB of RAM but the explody one is closer to 3 GB.
Instead of using map_rows
which is, imo, really clunky, you can instead use map_elements
. It has its own clunkiness as you often need to wrap your input in a struct but you can add columns more cleanly and you don't have to rely on column position.
For instance you can define your function and use it as follows:
def get_common_movie_ratings(row): #Each row is a tuple of arrays.
common_movies = row['common_movies'] #the index of the tuple denotes the 3rd array, which represents the common_movies column.
user_ratings = row['user_ratings']
ratings_for_common_movies= [user_ratings[list(row['user_movies']).index(movie)] for movie in common_movies]
return ratings_for_common_movies
df.with_columns(user_ratings=pl.struct(pl.all()).map_elements(get_common_movie_ratings))
What's happening here is that map_elements
can only be invoked from a single column so if your custom function needs multiple inputs you can wrap them in a struct. The struct will get turned into a dict where the keys have the name of the columns. This approach doesn't have any inherent performance benefit relative to the map_rows
, it's just, imo, better syntax.
As @jqurious mentioned in comments of his answer, this could almost certainly be streamlined in terms of both syntax and performance by incorporating this logic with the formation of these lists. In other words, you have step 1: ______ step 2: this question. While I can only guess at what's happening in step 1 it is very likely that combining the two steps would be a worthwhile endeavor.