My dataset looks like this
rows = [("user1",1,2,10),
("user2",2,6,27),
("user3",3,3,21),
("user4",4,7,44)
]
columns = ["id","val1","val2","val3"]
df = spark.createDataFrame(data=rows, schema = columns)
And I want to calculate cosine similarity scores for each user like below,
Kindly help in solving this.
Ps: I do not want to use sklearn library as I am dealing with big data.
You can use a UDF
function and a pivot
:
import numpy as np
from pyspark.sql import SparkSession, functions as F, types as T
@F.udf(T.DoubleType())
def cos_sim(a, b):
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
# Columns representing the values of the vectors
vector_columns = [c for c in df.columns if c.startswith('val')]
df2 = (
df.alias('a')
.crossJoin(df.alias('b'))
.withColumn(
'cs',
cos_sim(
F.array(*[F.col(f'a.{c}') for c in vector_columns]),
F.array(*[F.col(f'b.{c}') for c in vector_columns]),
)
)
.groupby('a.id')
.pivot('b.id')
.sum('cs')
)
The result is:
+-----+------------------+------------------+------------------+------------------+
| id| user1| user2| user3| user4|
+-----+------------------+------------------+------------------+------------------+
|user1|1.0000000000000002|0.9994487303346109|0.9975694083904585|0.9991881714548081|
|user2|0.9994487303346109| 1.0|0.9947592087399117|0.9980077882931742|
|user3|0.9975694083904585|0.9947592087399117| 1.0|0.9985781309458447|
|user4|0.9991881714548081|0.9980077882931742|0.9985781309458447| 1.0|
+-----+------------------+------------------+------------------+------------------+
Clearly, you can use a plain pyspark implementation. It depends on the quantity of data you have to process. Usually UDFs are slower, but simpler and faster to play with, especially when you want to try some quick thing.
Here a possible plain pyspark implementation:
from functools import reduce
from operator import add
def mynorm(*cols):
return F.sqrt(reduce(add, [F.pow(c, 2) for c in cols]))
def mydot(a_cols, b_cols):
return reduce(add, [a * b for a, b in zip(a_cols, b_cols)])
def cos_sim_ps(a_cols, b_cols):
return mydot(a_cols, b_cols) / (mynorm(*a_cols) * mynorm(*b_cols))
df3 = (
df.alias('a')
.crossJoin(df.alias('b'))
.withColumn(
'cs',
cos_sim_ps(
[F.col(f'a.{c}') for c in vector_columns],
[F.col(f'b.{c}') for c in vector_columns],
)
)
.groupby('a.id')
.pivot('b.id')
.sum('cs')
)