Search code examples
rowstack-overflowpyspark

PySpark Combine Rows to Columns StackOverFlow Error


What I want (very simplified):

Input Dataset to Output dataset

Some of the code I tried:

def add_columns(cur_typ, target, value):
        if cur_typ == target:
            return value
        return None
  schema = T.StructType([T.StructField("name", T.StringType(), True),
                         T.StructField("typeT", T.StringType(), True),
                         T.StructField("value", T.IntegerType(), True)])
  data = [("x", "a", 3), ("x", "b", 5), ("x", "c", 7), ("y", "a", 1), ("y", "b", 2),
          ("y", "c", 4), ("z", "a", 6), ("z", "b", 2), ("z", "c", 3)]
  df = ctx.spark_session.createDataFrame(ctx.spark_session.sparkContext.parallelize(data), schema)
  targets = [i.typeT for i in df.select("typeT").distinct().collect()]
  add_columns = F.udf(add_columns)
  w = Window.partitionBy('name')
  for target in targets:
      df = df.withColumn(target, F.max(F.lit(add_columns(df["typeT"], F.lit(target), df["value"]))).over(w))
  df = df.drop("typeT", "value").dropDuplicates()

another version:

targets = df.select(F.collect_set("typeT").alias("typeT")).first()["typeT"]

w = Window.partitionBy('name')

for target in targets:
  df = df.withColumn(target, F.max(F.lit(F.when(veh["typeT"] == F.lit(target), veh["value"])
                                                  .otherwise(None)).over(w)))

df = df.drop("typeT", "value").dropDuplicates()

For small datasets both work, but I have a dataframe with 1 million rows and 5000 different typeTs. So the result should be a table of about 500 x 5000 (some names do not have certain typeTs. Now I get stackoverflow errors (py4j.protocol.Py4JJavaError: An error occurred while calling o7624.withColumn. : java.lang.StackOverflowError) trying to create this dataframe. Besides increasing stacksize, what can I do? Is there a way better way to get the same result?


Solution

  • using withColumn in loop is not good, if no cols to be added are more.

    create an array of cols, and select them, which will result in better performance

    cols = [F.col("name")]
    for target in targets:
        cols.append(F.max(F.lit(add_columns(df["typeT"], F.lit(target), df["value"]))).over(w).alias(target))
    df = df.select(cols)
    

    which results the same output

    +----+---+---+---+
    |name|  c|  b|  a|
    +----+---+---+---+
    |   x|  7|  5|  3|
    |   z|  3|  2|  6|
    |   y|  4|  2|  1|
    +----+---+---+---+