Search code examples
pythondataframenumpynumbapython-polars

Efficiently compute item colaborating filtering similarity using numba, polars and numpy


Disclaimer The question is part of a thread including those two SO questions (q1, q2)

The data resemble movie ratings from the ratings.csv file (~891mb) of ml-latest dataset.

Once I read the csv file with polars library like:

movie_ratings = pl.read_csv(os.path.join(application_path + data_directory, "ratings.csv"))

Let's assume we want to compute the similarity between movies seen by user=1 (so for example 62 movies) with the rest of the movies in the dataset. FYI, the dataset has ~83,000 movies so for each other_movie (82,938) compute a similarity with each movie seen by user 1 (62 movies). The complexity is 62x82938 (iterations).

For this example the benchmarks reported are only for 400/82,938 other_movies

To do so, I create two polars dataframes. One dataframe with the other_movies (~82,938 row) and a second dataframe with only the movies seen by the user (62 rows).

user_ratings = movie_ratings.filter(pl.col("userId")==input_id) #input_id = 1 (data related to user 1)
user_rated_movies = list(user_ratings.select(pl.col("movieId")).to_numpy().ravel()) #movies seen by user1
potential_movies_to_recommend = list(
    movie_ratings.select("movieId").filter( ~(pl.col("movieId").is_in(user_rated_movies)) ).unique().sort("movieId").to_numpy().ravel()
)

items_metadata = (
    movie_ratings.filter(
        ~pl.col("movieId").is_in(user_rated_movies) #& pl.col("movieId").is_in(potential_movie_recommendations[:total_unseen_movies])
    )
    .group_by("movieId").agg(
        users_seen_movie=pl.col("userId").unique(),
        user_ratings=pl.col("rating")
    )
)

target_items_metadata = (
    movie_ratings.filter(
        pl.col("movieId").is_in(user_rated_movies) #& pl.col("movieId").is_in(potential_movie_recommendations[:total_unseen_movies])
    ).group_by("movieId").agg(
        users_seen_movie=pl.col("userId").unique(),
        user_ratings=pl.col("rating")
    )
)

The result are two polars dataframes with rows(movies) and columns(users seen the movies & the ratings from each user).

enter image description here

The first dataframe contains only other_movies that we can potentially recommend to user1 seen he/she has not seen them.

The second dataframe contains only the movies seen by the user.

Next my approach is to iterate over each row of the first dataframe by applying a UDF function.

item_metadata_similarity = (
    items_metadata.with_columns(
        similarity_score=pl.struct(pl.all()).map_elements(
            lambda row: item_compute_similarity_scoring_V2(row, similarity_metric, target_items_metadata),
            return_dtype=pl.List(pl.List(pl.Float64)),
            strategy="threading"
        )
    )
)

, where item_compute_similarity_scoring_V2 is defined as:

def item_compute_similarity_scoring_V2(
    row,
    target_movies_metadata:pl.DataFrame
):
    users_item1 = np.asarray(row["users_seen_movie"])
    ratings_item1 = np.asarray(row["user_ratings"])
    computed_similarity: list=[]
    for row2 in target_movies_metadata.iter_rows(named=True): #iter over each row from the second dataframe with the movies seen by the user.
        users_item2=np.asarray(row2["users_seen_movie"])
        ratings_item2=np.asarray(row2["user_ratings"])
        r1, r2 = item_ratings(users_item1, ratings_item1, users_item2, ratings_item2)
        if r1.shape[0] != 0 and r2.shape[0] != 0:
            similarity_score = compute_similarity_score(r1, r2)
            if similarity_score > 0.0: #filter out negative or zero similarity scores
                computed_similarity.append((row2["movieId"], similarity_score))
    most_similar_pairs = sorted(computed_similarity, key=lambda x: x[1], reverse=True)
    return most_similar_pairs

, item_ratings & compute_similarity_score defined as

def item_ratings(u1:np.ndarray, r1:np.ndarray, u2:np.ndarray, r2:np.ndarray) -> (np.ndarray, np.ndarray):
    common_elements, indices1, indices2 = np.intersect1d(u1, u2, return_indices=True)
    sr1 = r1[indices1]
    sr2 = r2[indices2]
    assert len(sr1)==len(sr2), "ratings don't have same lengths"
    return sr1, sr2

@jit(nopython=True, parallel=True)
def compute_similarity_score(array1:np.ndarray, array2:np.ndarray) -> float:
    assert(array1.shape[0] == array2.shape[0])
    a1a2 = 0
    a1a1 = 0
    a2a2 = 0
    for i in range(array1.shape[0]):
        a1a2 += array1[i]*array2[i]
        a1a1 += array1[i]*array1[i]
        a2a2 += array2[i]*array2[i]
    cos_theta = 1.0
    if a1a1!=0 and a2a2!=0:
        cos_theta = float(a1a2/np.sqrt(a1a1*a2a2))
    return cos_theta

The function basically, iterates over each row of the second dataframe and for each row computes the similarity between other_movie and the movie seen by the user. Thus, for 400 movies we do 400*62 iterations, generating 62 similarity scores per other_movie.

The result from each computation is an array with schema [[1, 0.20], [110, 0.34]]... (length 62 pairs per other_movie)

enter image description here

Benchmarks for 400 movies

  1. INFO - Item-Item: Computed similarity scores for 400 movies in: 0:05:49.887032
  2. ~2 minutes.
  3. ~5gb of RAM used.

I would to identify how can I improve the computations by using native polars commands or exploiting the numba framework for parallelism.

Update - 2nd approach using to_numpy() operations without iter_rows() and map_elements()

user_ratings = movie_ratings.filter(pl.col("userId")==input_id) #input_id = 1
user_rated_movies = user_ratings.select(pl.col("movieId")).to_numpy().ravel()
potential_movies_to_recommend = list(
    movie_ratings.select("movieId").filter( ~(pl.col("movieId").is_in(user_rated_movies)) ).unique().sort("movieId").to_numpy().ravel()
)
items_metadata = (
    movie_ratings.filter(
        ~pl.col("movieId").is_in(user_rated_movies)
    )
)
# print(items_metadata.head(5))
target_items_metadata = (
    movie_ratings.filter(
        pl.col("movieId").is_in(user_rated_movies)
    )
)
# print(target_items_metadata.head(5))

With this second approach items_metadata and target_items_metadata are two large polars tables.

Then my next step is to save both tables into numpy.ndarrays with the to_numpy() command.

items_metadata_array = items_metadata.to_numpy()
target_items_metadata_array = target_items_metadata.to_numpy()
computed_similarity_scores:dict = {}
for i, other_movie in enumerate(potential_movies_to_recommend[:400]): #take the first 400 unseen movies by user 1
    mask = items_metadata_array[:, 1] == other_movie
    other_movies_chunk = items_metadata_array[mask]
    u1 = other_movies_chunk[:,0].astype(np.int32)
    r1 = other_movies_chunk[:,2].astype(np.float32)
    computed_similarity: list=[]
    for i, user_movie in enumerate(user_rated_movies):
        print(user_movie)
        mask = target_items_metadata_array[:, 1] == user_movie
        target_movie_chunk = target_items_metadata_array[mask]
        u2 = target_movie_chunk[:,0].astype(np.int32)
        r2 = target_movie_chunk[:,2].astype(np.float32)
        common_r1, common_r2 = item_ratings(u1, r1, u2, r2)
        if common_r1.shape[0] != 0 and common_r2.shape[0] != 0:
            similarity_score = compute_similarity_score(common_r1, common_r2)
            if similarity_score > 0.0:
                computed_similarity.append((user_movie, similarity_score))
    most_similar_pairs = sorted(computed_similarity, key=lambda x: x[1], reverse=True)[:k_similar_user]
    computed_similarity_scores[str(other_movie)] = most_similar_pairs

Benchmarks of the second approach (8.50 minutes > 6 minutes of the first approach)

  • Item-Item: Computed similarity scores for 400 movies in: 0:08:50.537102

Update - 3rd approach using iter_rows() operations

In my third approach, I have better results from the previous two methods, getting results in approximately 2 minutes for user 1 and 400 movies.

items_metadata = (
    movie_ratings.filter(
        ~pl.col("movieId").is_in(user_rated_movies)
    )
    .group_by("movieId").agg(
        users_seen_movie=pl.col("userId").unique(),
        user_ratings=pl.col("rating")
    )
)

target_items_metadata = (
    movie_ratings.filter(
        pl.col("movieId").is_in(user_rated_movies)
    ).group_by("movieId").agg(
        users_seen_movie=pl.col("userId").unique(),
        user_ratings=pl.col("rating")
    )
)

items_metadata is the metadata of other_movies not seen by the user 1.

target_items_metadata the metadata of the movies rated by user 1. By the term metadata I refer to the two aggregated .agg() columns, users_seen_movie and user_ratings

Finally, I create two for loops using iter_rows() method from polars

def cosine_similarity_score(array1:np.ndarray, array2:np.ndarray) -> float:
    assert(array1.shape[0] == array2.shape[0])
    a1a2 = 0
    a1a1 = 0
    a2a2 = 0
    for i in range(array1.shape[0]):
        a1a2 += array1[i]*array2[i]
        a1a1 += array1[i]*array1[i]
        a2a2 += array2[i]*array2[i]
    # cos_theta = 1.0
    cos_theta = 0.0
    if a1a1!=0 and a2a2!=0:
        cos_theta = float(a1a2/np.sqrt(a1a1*a2a2))
    return max(0.0, cos_theta)

for row1 in item_metadata.iter_rows():
    computed_similarity: list= []
    for row2 in target_items_metadata.iter_rows():
        r1, r2 = item_ratings(np.asarray(row1[1]), np.asarray(row1[2]), np.asarray(row2[1]), np.asarray(row2[2]))
        if r1.shape[0]!=0 and r2.shape[0]!=0:
            similarity_score = cosine_similarity_score(r1, r2)
        computed_similarity.append((row2[0], similarity_score if similarity_score > 0 else 0))
    computed_similarity_scores[str(row1[0])] = sorted(computed_similarity, key=lambda x: x[1], reverse=True)[:k_similar_user]

Benchmarks for 400 movies

  1. INFO - Item-Item: Computed similarity scores for 400 movies in: 0:01:50
  2. ~2 minutes.
  3. ~4.5gb of RAM used.

Solution

  • I'm not too familiar with numba, so before trying to compare timings, the first thing I would try to do is create a "fully native" Polars approach:

    This is a direct translation of the current approach (i.e. it still contains the "double for loop") so it just serves as a baseline attempt.

    Because it uses the Lazy API, nothing in the loops is computed.

    That is all done when .collect() is called (which allows Polars to parallelize the work).

    The > 0.0 filtering for the similarity_score would be done after the results are collected.

    input_id = 1
    
    is_user_rating = pl.col("userId") == input_id
    
    can_recommend = (
        pl.col("movieId").is_in(pl.col("movieId").filter(is_user_rating)).not_()
    )
    
    cosine_similarity = (
        pl.col('rating').dot('rating_right') /  
        ( pl.col('rating').pow(2).sum().sqrt() * 
          pl.col('rating_right').pow(2).sum().sqrt() ) 
    )
    
    user_rated_movies = movie_ratings.filter(is_user_rating).select("movieId").to_series()
    
    potential_movies_to_recommend = (
        movie_ratings.filter(can_recommend).select(pl.col("movieId").unique().sort())
    )
    
    # use the Lazy API so we can compute in parallel
    df = movie_ratings.lazy()
    
    computed_similarity_scores = []
    for other_movie in potential_movies_to_recommend.head(1).to_series(): # .head(N) potential movies
        for user_movie in user_rated_movies:
            score = (
                df.filter(pl.col("movieId") == user_movie)
                  .join(
                     df.filter(pl.col("movieId") == other_movie),
                     on = "userId"
                  )
                  .select(cosine = cosine_similarity)
                  .select(user_movie=user_movie, other_movie=other_movie, similarity_score="cosine")
            )
            computed_similarity_scores.append(score)
            
    # All scores are computed in parallel
    computed_similarity_scores_polars = pl.concat(computed_similarity_scores).collect()
    
    shape: (62, 3)
    ┌────────────┬─────────────┬──────────────────┐
    │ user_movie ┆ other_movie ┆ similarity_score │
    │ ---        ┆ ---         ┆ ---              │
    │ i32        ┆ i32         ┆ f64              │
    ╞════════════╪═════════════╪══════════════════╡
    │ 1          ┆ 2           ┆ 0.95669          │
    │ 110        ┆ 2           ┆ 0.950086         │
    │ 158        ┆ 2           ┆ 0.957631         │
    │ 260        ┆ 2           ┆ 0.945542         │
    │ …          ┆ …           ┆ …                │
    │ 49647      ┆ 2           ┆ 0.9411           │
    │ 52458      ┆ 2           ┆ 0.955353         │
    │ 53996      ┆ 2           ┆ 0.930388         │
    │ 54259      ┆ 2           ┆ 0.95469          │
    └────────────┴─────────────┴──────────────────┘
    

    Testing .head(100) I get 58s runtime compared to 111s runtime for your example, memory consumption is the same.

    duckdb

    As a comparison, duckdb with .head(400) runs in 5s

    import duckdb
    
    df = duckdb.sql("""
    with 
       df     as (from 'imdb.parquet'),
       user   as (from df where movieId in (from df select movieId where userId = 1)),
       movies as (from df where movieId not in (from df select movieId where userId = 1)),
       other  as (from df where movieId in (from movies select distinct movieId order by movieId limit 400))
       
    from
       user join other using (userId)
       
    select   
       user.movieId user_movie,
       other.movieId other_movie,
       list_cosine_similarity(
          list(user.rating), list(other.rating)
       ) similarity_score
       
    group by 
       user_movie, other_movie   
    order by 
       user_movie, other_movie
    """).pl()
    
    shape: (24_764, 3)
    ┌────────────┬─────────────┬──────────────────┐
    │ user_movie ┆ other_movie ┆ similarity_score │
    │ ---        ┆ ---         ┆ ---              │
    │ i64        ┆ i64         ┆ f64              │
    ╞════════════╪═════════════╪══════════════════╡
    │ 1          ┆ 2           ┆ 0.95669          │
    │ 1          ┆ 3           ┆ 0.941348         │
    │ 1          ┆ 4           ┆ 0.92169          │
    │ 1          ┆ 5           ┆ 0.943999         │
    │ …          ┆ …           ┆ …                │
    │ 54259      ┆ 407         ┆ 0.941241         │
    │ 54259      ┆ 408         ┆ 0.934745         │
    │ 54259      ┆ 409         ┆ 0.937361         │
    │ 54259      ┆ 410         ┆ 0.94937          │
    └────────────┴─────────────┴──────────────────┘
    Elapsed time: 5.02638 seconds