I'm trying to do an element-wise sum, and I've created this dummy df. The output should be [10,4,4,1]
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, ArrayType
data = [
("James",[1,1,1,1]),
("James",[2,1,1,0]),
("James",[3,1,1,0]),
("James",[4,1,1,0])
]
schema = StructType([ \
StructField("firstname",StringType(),True), \
StructField("scores", ArrayType(IntegerType()), True) \
])
df = spark.createDataFrame(data=data,schema=schema)
posexplode works, but my real df is too large so I'm trying to use fold, but it gives me an error. Any ideas? Thanks!
vec_df = df.select("scores")
vec_sums = vec_df.rdd.fold([0]*4, lambda a,b: [x + y for x, y in zip(a, b)])
File "<ipython-input-115-9b470dedcfef>", line 2, in <listcomp>
TypeError: unsupported operand type(s) for +: 'int' and 'list'
You need to map the RDD of rows to an RDD of lists before fold
:
vec_sums = vec_df.rdd.map(lambda x: x[0]).fold([0]*4, lambda a,b: [x + y for x, y in zip(a, b)])
To help understanding, you can see how the RDDs look like.
>>> vec_df.rdd.collect()
[Row(scores=[1, 1, 1, 1]), Row(scores=[2, 1, 1, 0]), Row(scores=[3, 1, 1, 0]), Row(scores=[4, 1, 1, 0])]
>>> vec_df.rdd.map(lambda x: x[0]).collect()
[[1, 1, 1, 1], [2, 1, 1, 0], [3, 1, 1, 0], [4, 1, 1, 0]]
So you can imagine that vec_df.rdd
contains a nested list, which needs to be unnested before fold
.