Search code examples
arraysapache-sparkpysparkapache-spark-sqlaggregate

Statistics from nested array using PySpark


I want to perform some aggregations on the table using PySpark. My table name is COMPLEX_DATA and is stored in Snowflake. The COORDINATES column's datatype is VARIANT. The nested arrays are not of fixed sizes. I could have 1,000 nested arrays, and there are other cases where there are none. My table is shown below:

| ID |         COORDINATES                  |
|----|--------------------------------------|
| 1  |[[0, 15], [1, 50], [2, 5]], [5, 650]] |
| 2  |[[0, 20], [1, 17]]                    |
| 3  |[[0, 10], [1, 15], [2, 11]]           |

The output should look like the following:

| Index| Min | Max | Mean |
|------|-----|-----|------|
| 0    |  10 |  20 | 15   |
| 1    |  15 |  50 | 27.33| 
| 2    |  5  |  11 | 8    |
| 5    |  650|  650| 650  |

Solution

  • Lets assume that when you import this table in spark corrdinates column will be of Array type so based on it below code can be used where I have used explode and then group by over Index

    from pyspark.sql import functions as F
    deta=[(1,[[0, 15], [1, 50], [2, 5], [5, 650]]),(2,[[0, 20], [1, 17]]),(3,[[0, 10], [1, 15], [2, 11]])]
    dql1=spark.createDataFrame(deta).toDF("ID","COORDINATES")
    dql1.withColumn("ecor",F.explode("coordinates")).withColumn("Index",F.col("ecor")[0]).withColumn("ecor_val",F.col("ecor")[1]).drop("id","coordinates","ecor").groupBy("Index").agg(F.min("ecor_val").alias("Min"),F.max("ecor_val").alias("Max"),F.mean("ecor_val").alias("Mean")).show()
    
    #output
    +-----+---+---+------------------+
    |Index|Min|Max|              Mean|
    +-----+---+---+------------------+
    |    0| 10| 20|              15.0|
    |    1| 15| 50|27.333333333333332|
    |    5|650|650|             650.0|
    |    2|  5| 11|               8.0|
    +-----+---+---+------------------+
    

    Also from datqa you have provided min of 0 should be 10 not 15 as for ID =3 we have [0, 10] in coordinates