Search code examples
machine-learningpysparksimilaritycosine-similarity

Calculating cosine similarity in Pyspark Dataframe


My dataset looks like this

rows = [("user1",1,2,10), 
        ("user2",2,6,27), 
        ("user3",3,3,21), 
        ("user4",4,7,44) 
      ]
columns = ["id","val1","val2","val3"]

df = spark.createDataFrame(data=rows, schema = columns)

enter image description here

And I want to calculate cosine similarity scores for each user like below,

enter image description here

Kindly help in solving this.

Ps: I do not want to use sklearn library as I am dealing with big data.


Solution

  • You can use a UDF function and a pivot:

    import numpy as np
    
    from pyspark.sql import SparkSession, functions as F, types as T
    
    @F.udf(T.DoubleType())
    def cos_sim(a, b):
        return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
    
    # Columns representing the values of the vectors
    vector_columns = [c for c in df.columns if c.startswith('val')]
    
    df2 = (
        df.alias('a')
        .crossJoin(df.alias('b'))
        .withColumn(
            'cs',
            cos_sim(
                F.array(*[F.col(f'a.{c}') for c in vector_columns]),
                F.array(*[F.col(f'b.{c}') for c in vector_columns]),
            )
        )
        .groupby('a.id')
        .pivot('b.id')
        .sum('cs')
    )
    

    The result is:

    +-----+------------------+------------------+------------------+------------------+
    |   id|             user1|             user2|             user3|             user4|
    +-----+------------------+------------------+------------------+------------------+
    |user1|1.0000000000000002|0.9994487303346109|0.9975694083904585|0.9991881714548081|
    |user2|0.9994487303346109|               1.0|0.9947592087399117|0.9980077882931742|
    |user3|0.9975694083904585|0.9947592087399117|               1.0|0.9985781309458447|
    |user4|0.9991881714548081|0.9980077882931742|0.9985781309458447|               1.0|
    +-----+------------------+------------------+------------------+------------------+
    

    Clearly, you can use a plain pyspark implementation. It depends on the quantity of data you have to process. Usually UDFs are slower, but simpler and faster to play with, especially when you want to try some quick thing.

    Here a possible plain pyspark implementation:

    from functools import reduce
    from operator import add
    
    
    def mynorm(*cols):
        return F.sqrt(reduce(add, [F.pow(c, 2) for c in cols]))    
        
    def mydot(a_cols, b_cols):
        return reduce(add, [a * b for a, b in zip(a_cols, b_cols)])
    
    
    def cos_sim_ps(a_cols, b_cols):
        return mydot(a_cols, b_cols) / (mynorm(*a_cols) * mynorm(*b_cols))
        
    
    df3 = (
        df.alias('a')
        .crossJoin(df.alias('b'))
        .withColumn(
            'cs',
            cos_sim_ps(
                [F.col(f'a.{c}') for c in vector_columns],
                [F.col(f'b.{c}') for c in vector_columns],
            )
        )
        .groupby('a.id')
        .pivot('b.id')
        .sum('cs')
    )