Search code examples
pysparkdatabricksdatabricks-sqlpyspark-pandas

Execute query in parallel over a list of rows in pyspark


In databricks I have N delta tables of stores with their products with this schema:

store_1:

store product sku
1 prod 1 abc
1 prod 2 def
1 prod 3 ghi

store_2:

store product sku
2 prod 1 abc
2 prod 10 xyz
2 prod 23 ghi

I need to find every every identical product from the store 1 in the store 2, using the SKU column, so I'm running a query like this:

select * from df_store_1 st1 join df_store_2 st2 on st1.sku=st2.sku

That should return this product:

product sku
prod 1 abc

I need to do this for all the stores and products, and since I am a beginner with PySpark I was thinking to create a list of all the possible pairs of stores and traverse them like this:

list_dfs = []

for store1, store2 in list_all_pairs_stores:
   temp_df = spark.sql(“select st1.product, st2.product, st1.sku from store1 st1 join store2 st2 on st1.sku=st2.sku”).toPandas()
   list_dfs.append(temp_df)

all_equal_products = pd.concat(list_dfs, axis=1)

What would be an efficient way to parallelize these queries?


Solution

  • Instead of generating all possible pairs of stores, you can leverage the power of PySpark to perform the join operation on all pairs of stores in a single query.

    Here is an example of how you can achieve this:

    from pyspark.sql.functions import *
    
    # Define a list of all the store names
    store_names = ["store_1", "store_2", ... , "store_N"]
    
    # Create a dictionary of dataframes, where the keys are the store names
    store_dfs = {}
    for store_name in store_names:
        store_dfs[store_name] = spark.table(store_name)
    
    # Define a function to perform the join operation
    def join_identical_products(store1, store2):
        return store_dfs[store1].join(store_dfs[store2], on="sku").select(col(store1+".product").alias(store1+"_product"), 
                                                                         col(store2+".product").alias(store2+"_product"), 
                                                                         col(store1+".sku"))
    
    # Generate all pairs of stores
    store_pairs = [(store1, store2) for i, store1 in enumerate(store_names) for store2 in store_names[i+1:]]
    
    # Perform the join operation on all pairs of stores
    all_equal_products = None
    for store1, store2 in store_pairs:
        temp_df = join_identical_products(store1, store2)
        if all_equal_products is None:
            all_equal_products = temp_df
        else:
            all_equal_products = all_equal_products.union(temp_df)
    

    Now in this case it defines a list of all the store names, and then creates a dictionary of dataframes where the keys are the store names. It then defines a function called join_identical_products which takes two store names as inputs and performs the join operation on the sku column.

    Subsequently, it generates all possible pairs of stores using a list comprehension, and then iterates through each pair of stores, performing the join operation and adding the results to a single dataframe called all_equal_products.