In databricks I have N delta tables of stores with their products with this schema:
store_1:
store | product | sku |
---|---|---|
1 | prod 1 | abc |
1 | prod 2 | def |
1 | prod 3 | ghi |
store_2:
store | product | sku |
---|---|---|
2 | prod 1 | abc |
2 | prod 10 | xyz |
2 | prod 23 | ghi |
I need to find every every identical product from the store 1 in the store 2, using the SKU column, so I'm running a query like this:
select * from df_store_1 st1 join df_store_2 st2 on st1.sku=st2.sku
That should return this product:
product | sku |
---|---|
prod 1 | abc |
I need to do this for all the stores and products, and since I am a beginner with PySpark I was thinking to create a list of all the possible pairs of stores and traverse them like this:
list_dfs = []
for store1, store2 in list_all_pairs_stores:
temp_df = spark.sql(“select st1.product, st2.product, st1.sku from store1 st1 join store2 st2 on st1.sku=st2.sku”).toPandas()
list_dfs.append(temp_df)
all_equal_products = pd.concat(list_dfs, axis=1)
What would be an efficient way to parallelize these queries?
Instead of generating all possible pairs of stores, you can leverage the power of PySpark to perform the join operation on all pairs of stores in a single query.
Here is an example of how you can achieve this:
from pyspark.sql.functions import *
# Define a list of all the store names
store_names = ["store_1", "store_2", ... , "store_N"]
# Create a dictionary of dataframes, where the keys are the store names
store_dfs = {}
for store_name in store_names:
store_dfs[store_name] = spark.table(store_name)
# Define a function to perform the join operation
def join_identical_products(store1, store2):
return store_dfs[store1].join(store_dfs[store2], on="sku").select(col(store1+".product").alias(store1+"_product"),
col(store2+".product").alias(store2+"_product"),
col(store1+".sku"))
# Generate all pairs of stores
store_pairs = [(store1, store2) for i, store1 in enumerate(store_names) for store2 in store_names[i+1:]]
# Perform the join operation on all pairs of stores
all_equal_products = None
for store1, store2 in store_pairs:
temp_df = join_identical_products(store1, store2)
if all_equal_products is None:
all_equal_products = temp_df
else:
all_equal_products = all_equal_products.union(temp_df)
Now in this case it defines a list of all the store names, and then creates a dictionary of dataframes
where the keys are the store names. It then defines a function called join_identical_products
which takes two store names as inputs and performs the join operation on the sku
column.
Subsequently, it generates all possible pairs of stores using a list comprehension, and then iterates through each pair of stores, performing the join operation and adding the results to a single dataframe
called all_equal_products
.