Search code examples
pythonpysparkparallel-processingdot-productdata-munging

Turn user product views into network matrix/graph in python spark (pyspark)


I'm working with website data that includes user ID's and the products/items those users viewed. I've created a pyspark dataframe that looks something like this:

+--------+----------+-------+----------+---------+
|  UserId|  productA|  itemB|  articleC|  objectD|
+--------+----------+-------+----------+---------+
|   user1|         1|      1|      null|     null|
|   user2|         1|      1|      null|     null|
|   user3|      null|      1|         1|     null|
|   user4|      null|   null|      null|        1|
+--------+----------+-------+----------+---------+

where the 1's represent that the user has viewed that product at least once, and null's represent that the user has NOT viewed that product. There are several hundreds of products/items and millions of users (this is just a simplified example).

I want to perform an operation in pyspark to get a DataFrame like this:

+-----------+----------+-------+----------+---------+
|           |  productA|  itemB|  articleC|  objectD|
+-----------+----------+-------+----------+---------+
|   productA|         2|      2|         0|        0|
|      itemB|         2|      3|         1|        0|
|   articleC|         0|      1|         1|        0|
|    objectD|         0|      0|         0|        1|
+-----------+----------+-------+----------+---------+

This Dataframe shows me the counts of users where, if they viewed one product/item, also viewed another item. Obviously, the diagonal of this Dataframe is the counts of users that viewed each product, but the interesting part is the symmetric off-diagonal values. In this simplified example you can see that all users who viewed productA also viewed itemB, but that for the 3 users that viewed itemB only 2 of them viewed product A.

I created a VERY inefficient routine to calculate this, but with the size of the data set it takes ~22 hours to complete. How can I utilize pyspark's capabilities to make the computation below run faster?

import numpy as np
import pandas as pd
import pyspark.sql.functions as F

# df_pivot is the name of the first Dataframe in my explanation above
columns = [c for c in df_pivot.columns]
cols = columns[1:]
net = pd.DataFrame(np.zeros((len(cols), len(cols))), index=cols, columns=cols)

for i in range(len(cols)):
  c = cols[i]
  cs = cols[i:]
  print(f'{i + 1}: {c}')
  sum_row = df_pivot.where(F.col(c).isNotNull())\
                    .select(*cs)\
                    .groupBy().sum().collect()[0]\
                    .asDict()
  
  sum_row = {k.replace('sum(', '')[:-1]: v for k, v in sum_row.items()}
  values = [sum_row[x] for x in cs]
  net.loc[c, cs] = values
  net.loc[cs, c] = values

net.head()

UPDATE

Speaking with a coworker we've found a way to do this (if we can get the data into a pandas DataFrame with out memory errors), by converting the data into a scipy csc_matrix, and then taking the gramian of the matrix like this:

gramian = sp_csc.transpose().dot(sp_csc)

where sp_csc is the scipy "Compressed Sparse Column matrix".

Forcing the pyspark DataFrame into pandas still seems limiting based on the size of the data. Is there a better way, in pyspark, to calculate the gramian (dot product of the transpose of a pyspark DataFrame and the pyspark DataFrame itself)?

Update 2

I found a way to make the original code/loop run much faster. I needed to cache the df_pivot dataframe with a df_pivot.cache() command prior to the loop. Due to pyspark's lazy calculation, the loop was causing pyspark to recalculate all prior calculations during each loop. Although this solves my immediate need of calculating this fast enough, I'd still be interested in how someone might do this in pyspark with a parallelize, map, and reduce routine perhaps?


Solution

  • IIUC, you can unpivot the original dataframe df_pivot and from there make a self full-outer join using userId and then do pivot again.

    from pyspark.sql import functions as F
    
    # list of columns to do pivot
    cols = df_pivot.columns[1:]
    
    # normalize the df_pivot to userId vs target
    df1 = df_pivot.select(
        'userId', 
        F.explode(F.split(F.concat_ws('|', *[F.when(F.col(c).isNotNull(), F.lit(c)) for c in cols]),'\|')).alias('target')
    )
    #df1.show()
    #+------+--------+
    #|userId|  target|
    #+------+--------+
    #| user1|productA|
    #| user1|   itemB|
    #| user2|productA|
    #| user2|   itemB|
    #| user3|   itemB|
    #| user3|articleC|
    #| user4| objectD|
    #+------+--------+
    
    # self full-outer join
    df2 = df1.join(df1.withColumnRenamed('target','target_1'),'userId','full')
    
    # pivot
    df_new = df2.groupby('target') \
        .pivot('target_1', cols) \
        .agg(F.countDistinct('userId')) \
        .fillna(0, subset=cols)
    #+--------+--------+-----+--------+-------+
    #|  target|productA|itemB|articleC|objectD|
    #+--------+--------+-----+--------+-------+
    #|productA|       2|    2|       0|      0|
    #|   itemB|       2|    3|       1|      0|
    #|articleC|       0|    1|       1|      0|
    #| objectD|       0|    0|       0|      1|
    #+--------+--------+-----+--------+-------+
    

    Note: you might just need F.count('*') instead of F.countDistinct('userId') in the final aggregation based on your actual requirement.