Search code examples
apache-sparkpysparknested-loops

pyspark nested for loops


I have seen many question of the same nature, but, I am still confused: some say use Groupby, some propose the use of map or flatmap. Not sure what to try.

Input: A PySpark DF with date, term, brand, text column. Expected output: A list of filtered dfs [df1, df2, ...]

In Python, the code looks like this (df is a big dataframe, has about 10000 records):

filtered_df_list = []

for term in term_list:
  for br in brand_list:
    for dt in date_list:
      tcd_df = df.filter( (df.term == term) & (df.brand == brand) & (df.date == dt) )
      if len(tcd_df.index) > 0:
        filtered_df_list.append(tcd_df)

Also, can I create a dataframe out of term, brand and date lists and do a 'withColumn' to create a new column that will have the filtered dfs? I am not sure. A starter code for parallelizing this will be helpful.

Update: This is what I tried and it looks like worker nodes are trying to do the filtering with the sparkContext that is not allowed.

rdd2 = df.rdd.map(lambda row: (
    row["date"], row["brand"], row["term"], row["text"],
    df.filter((df.brand == row["brand"]) & (df.term == row["term"]) & (df.date == row["date"]))
))

print("\n\nAfter loop")
df2 = rdd2.toDF(["date", "brand", "term", "text"])
df2.show()

Error (more or less I expected this): _pickle.PicklingError: Could not serialize object: RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.


Solution

  • Here's a suggestion for the concatenation part:

    df.groupby("date", "brand", "term").agg(
        f.concat_ws(":", f.collect_list(df.text))
    )
    

    This will give you a df, with one row per "date", "brand", "term" combination, with all the text concatenated (replace ":" with " " if you don't want a separator).

    If you want to split them into different dfs I recommend writing them down to a partitioned parquet folder, and using the concatenation of "date", "brand", "term" as your partition column.

    Writing will save your data (gives you data for debug) and will allow you to do your text summarization simultaneously.