Search code examples
pythonapache-sparkpysparkapache-spark-ml

What's the best way to do matrix multiplication between 2 lists of sparseVectors with DataFrame-based API in pyspark?


I have 2 DataFrames with the same structure: DataFrame[id: bigint, tfidf_features: vector]

I need to multiple rows in dataframe1 with rows in dataframe2. I can use a loop and do things like: dataframe1.collect()[i]['tfidf_features'].dot(dataframe2.collect()[j]['tfidf_features']).

However, I would like to use matrix multiplication, something equivalent to: np.matmul(dataframe1_tfidf_features, dataframe2_tfidf_features.T).


Solution

  • You have two choices
    1. mllib.linalg.distributed.BlockMatrix convert both dataframes to block matrices and use mulitply

    bm1 = IndexedRowMatrix(df1.rdd.map(lambda x: IndexedRow(x[0], x[1]))).toBlockMatrix()
    bm2 = IndexedRowMatrix(df2.rdd.map(lambda x: IndexedRow(x[0], x[1]))).toBlockMatrix()
    bm_result = bm1.multiply(bm2)  
    

    2. pyspark.sql.dataframe.crossJoin crossjoin both dataframes and calculate individual element of resultant matrix and then use collect_list & sort

    arr = np.array
    df =df1.crossJoin(df2.select(col("id").alias("id2"),
                                     col("features").alias("features2"))
    
    udf_mult = udf(lambda x,y = float(arr(x).dot(arr(y).T).sum()), DoubleType()) 
    df = df.withColumn("val", udf_mult("features","features2")).
                             drop("features","features2")
    st = struct(["id2","val"]).alias("map")
    df = df.select("id", st).groupBy("id").agg(collect_list("map").alias("list"))
    
    def sort(x):
    
        x = sorted(x, key=lambda x:x[0])
        y = list(map(lambda a:a[1], x))
        return(y)
    udf_sort = udf(sort, ArrayType(DoubleType()))
    df = df.withColumn("list", udf_sort("list"))