Search code examples
pythonapache-sparkpysparkapache-spark-sqlcosine-similarity

How to get cosine similarity scores for all users and all the items in PySpark, if user's and item's embeddings are given?


I have a users df-

df1 = spark.createDataFrame([
    ("u1", [0., 2., 3.]),
    ("u2", [1., 0., 0.]),
    ("u3", [0., 0., 3.]),
    ],
    ['user_id', 'features'])

print(df1.printSchema())
df1.show(truncate=False)

Output-

root
 |-- user_id: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: double (containsNull = true)

None
+-------+---------------+
|user_id|features       |
+-------+---------------+
|u1     |[0.0, 2.0, 3.0]|
|u2     |[1.0, 0.0, 0.0]|
|u3     |[0.0, 0.0, 3.0]|
+-------+---------------+

And I have an items df-

df2 = spark.createDataFrame([
    ("i1", [0., 2., 3.]),
    ("i2", [1.1, 0., 0.]),
    ("i3", [0., 0., 3.1]),
    ],
    ['item_id', 'features'])

print(df2.printSchema())
df2.show(truncate=False)

Output-

root
 |-- item_id: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: double (containsNull = true)

None
+-------+---------------+
|item_id|features       |
+-------+---------------+
|i1     |[0.0, 2.0, 3.0]|
|i2     |[1.1, 0.0, 0.0]|
|i3     |[0.0, 0.0, 3.1]|
+-------+---------------+

How do I calculate the cosine similarity score for all the user-item pairs, such that it becomes easy for me to rank the items for every user?

The final dataframe should look something like-

+-------+-------+-----------------+
|user_id|item_id|cosine_similarity|
+-------+-------+-----------------+
|u1     |     i1|      some number|
|u1     |     i2|      some number|
|u1     |     i3|      some number|
|u2     |     i1|      some number|
|u2     |     i2|      some number|
|u2     |     i3|      some number|
|u3     |     i1|      some number|
|u3     |     i2|      some number|
|u3     |     i3|      some number|
+-------+-------+-----------------+

Solution

  • Here is a way using sklearn and the underlying RDD:

    from pyspark.sql import functions as F
    from sklearn.metrics.pairwise import cosine_similarity
    
    # Join DFs
    df = df1.crossJoin(df2.select('item_id', F.col("features").alias("features_item")))
    
    # Get cosine similarity
    result = df.rdd.map(lambda x: (x['user_id'], x['item_id'],
                                   float(
                                       cosine_similarity(
                                           [x['features']],
                                           [x['features_item']]
                                       )[0,0]
                                   )
                                  )
                       ).toDF(schema=['user_id', 'item_id', 'cosine_similarity'])