Search code examples
pythonapache-sparkpysparkrddfold

pyspark - fold and sum with ArrayType column


I'm trying to do an element-wise sum, and I've created this dummy df. The output should be [10,4,4,1]

from pyspark.sql.types import StructType,StructField, StringType, IntegerType, ArrayType
data = [
    ("James",[1,1,1,1]),
    ("James",[2,1,1,0]),
    ("James",[3,1,1,0]),
    ("James",[4,1,1,0])
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("scores", ArrayType(IntegerType()), True) \
  ])
 
df = spark.createDataFrame(data=data,schema=schema)

posexplode works, but my real df is too large so I'm trying to use fold, but it gives me an error. Any ideas? Thanks!

vec_df = df.select("scores")
vec_sums = vec_df.rdd.fold([0]*4, lambda a,b: [x + y for x, y in zip(a, b)])

File "<ipython-input-115-9b470dedcfef>", line 2, in <listcomp>

TypeError: unsupported operand type(s) for +: 'int' and 'list'


Solution

  • You need to map the RDD of rows to an RDD of lists before fold:

    vec_sums = vec_df.rdd.map(lambda x: x[0]).fold([0]*4, lambda a,b: [x + y for x, y in zip(a, b)])
    

    To help understanding, you can see how the RDDs look like.

    >>> vec_df.rdd.collect()
    [Row(scores=[1, 1, 1, 1]), Row(scores=[2, 1, 1, 0]), Row(scores=[3, 1, 1, 0]), Row(scores=[4, 1, 1, 0])]
    
    >>> vec_df.rdd.map(lambda x: x[0]).collect()
    [[1, 1, 1, 1], [2, 1, 1, 0], [3, 1, 1, 0], [4, 1, 1, 0]]
    

    So you can imagine that vec_df.rdd contains a nested list, which needs to be unnested before fold.