Search code examples
pysparkapache-spark-sqlmelt

Convert PySpark Dense Column Vectors into Rows


I have a data-frame with 3 columns and every entry is a dense vector of same length. How can I melt the Vector entries?

Current data-frame:

column1 | column2 |

[1.0,2.0,3.0]|[10.0,4.0,3.0]

[5.0,4.0,3.0]|[11.0,26.0,3.0]

[9.0,8.0,7.0]|[13.0,7.0,3.0]

Expected:

column1|column2

1.0 . 10.0

2.0 . 4.0

3.0 . 3.0

5.0 . 11.0

4.0 . 26.0

3.0 . 3.0

9.0 . 13.0

...


Solution

  • Step 1: Let's create the initial DataFrame:

    myValues = [([1.0,2.0,3.0],[10.0,4.0,3.0]),([5.0,4.0,3.0],[11.0,26.0,3.0]),([9.0,8.0,7.0],[13.0,7.0,3.0])]
    df = sqlContext.createDataFrame(myValues,['column1','column2'])
    df.show()
    +---------------+-----------------+
    |        column1|          column2|
    +---------------+-----------------+
    |[1.0, 2.0, 3.0]| [10.0, 4.0, 3.0]|
    |[5.0, 4.0, 3.0]|[11.0, 26.0, 3.0]|
    |[9.0, 8.0, 7.0]| [13.0, 7.0, 3.0]|
    +---------------+-----------------+
    

    Step 2: Now, explode both the columns, but after we zip the arrays. Here we know before hand that the length of list/array is 3.

    from pyspark.sql.functions import array, struct
    tmp = explode(array(*[
        struct(col("column1").getItem(i).alias("column1"), col("column2").getItem(i).alias("column2"))
        for i in range(3)
    ]))
    df=(df.withColumn("tmp", tmp).select(col("tmp").getItem("column1").alias('column1'), col("tmp").getItem("column2").alias('column2')))
    df.show()
    +-------+-------+
    |column1|column2|
    +-------+-------+
    |    1.0|   10.0|
    |    2.0|    4.0|
    |    3.0|    3.0|
    |    5.0|   11.0|
    |    4.0|   26.0|
    |    3.0|    3.0|
    |    9.0|   13.0|
    |    8.0|    7.0|
    |    7.0|    3.0|
    +-------+-------+