Search code examples
python-3.xdataframepysparkjohnsnowlabs-spark-nlp

PySpark: Element-wise sum ALL DenseVectors in each cell of a dataframe


I need to element-wise sum ALL DenseVectors for each sentence. There can be multiple DenseVectors (per sentence) in each row's cell, i.e. embeddings. I'll explain with the help of multiple lists in a cell. Take the following example:

+----------------------------------+
|abc                               |
+----------------------------------+
|[[1.0, 2.0, 3.0], [1.2, 2.2, 3.2]]|
|[[2.0, 3.0, 4.0], [3.1, 4.1, 5.1]]|
|[[3.0, 4.0, 5.0], [4.3, 5.3, 6.3]]|
|[[4.0, 5.0, 6.0], [5.2, 4.2, 7.2]]|
+----------------------------------+

The output required is:

+-----------------+
|abc              |
+-----------------+
|[2.2, 4.2,  6.2]|
|[5.1, 7.1,  9.1]|
|[7.3, 9.3, 11.3]|
|[9.2, 9.2, 13.2]|
+-----------------+

I've tried a few ways on the following lines but nothing seems to work.

schema = StructType([
  StructField("abc", ArrayType(ArrayType(FloatType())), True)
])

test_data = [
    Row([[1.0, 2.0, 3.0], [1.2, 2.2, 3.2]]),
    Row([[2.0, 3.0, 4.0], [3.1, 4.1, 5.1]]),
    Row([[3.0, 4.0, 5.0], [4.3, 5.3, 6.3]]),
    Row([[4.0, 5.0, 6.0], [5.2, 4.2, 7.2]])
    ]

test_df = spark.createDataFrame(data=test_data, schema=schema)

test_df.groupBy("abc").agg(array([sum(col("abc")[i].getItem(j)) for j in range(3) for i in range(len(test_df.select('abc').first()[0]))]).alias("sum")).show(truncate=False)

test_df.groupBy("abc").agg(array(*[array(*[sum(col("abc")[i].getItem(j)) for j in range(3) for i in range(len(test_df.select('abc').first()[0]))])]).alias("sum")).show(truncate=False)

test_df.groupBy("abc").agg(array(*[array(*[sum(col("abc")[i].getItem(j)) for i in range(len(test_df.select('abc').first()[0]))]) for j in range(3) ]).alias("sum")).show(truncate=False)

Solution

  • You can use zip_with to do the element wise addition. Then, reduce to aggregate into 1 final array.

    df = (df.withColumn('abc', F.reduce(
             'abc', 
             F.array([F.lit(0.0), F.lit(0.0), F.lit(0.0)]), 
             lambda acc, x: F.zip_with(acc, x, lambda iacc, ix: iacc + ix))))
    

    ============================================

    UPDATE

    If your array size is fixed, you can change F.array part to create the fixed size array as

    F.array([F.lit(0.0) for x in range(1064)])
    

    and to your second question, if you are using Pyspark 3.1-<3.5, you can use aggregate. Function interface is same, so just change reduce to aggregate.

    If your array size is dynamic, you can try this to return the initial array within the aggregation function.

    df = (df.withColumn('abc',  F.aggregate(
             'abc', 
             F.array().cast('array<double>'), 
             lambda acc, x: (F.when(F.size(acc) < 1, x)
                             .otherwise(F.zip_with(acc, x, lambda iacc, ix: iacc + ix))))))