Search code examples
pythonpandasdataframepysparkmultiple-columns

Dynamically create columns from string with delimiter in Spark


I have a table like this:

a |  a_vector                               |  
1 | 710.83;-776.98;-10.86;2013.02;-896.28;  |  
2 | 3  ; 2  ;  1                            | 

Using PySpark/pandas, how do I dynamically create columns so that first values in vector go to "col1" and second values go to "col2" etc. + calculate the sum?

a |  a_vector                               |  col1 | col2   | col3
1 | 300;-200;2022;                          |  300  |  -200  | 2022
2 | 3  ; 2  ;  1                            |   3   |  2     | 1

The final requirement is to have the sums of new columns sorted in one column.


Solution

  • In PySpark, you can do it by first splitting your string on ; (creating an array) and then selecting columns using list comprehension. The sum can be calculated using aggregate higher-order function.

    Input:

    from pyspark.sql import functions as F
    df = spark.createDataFrame(
        [('1', '300;-200;2022'),
         ('2', '3  ; 2  ;   1')],
        ['a', 'a_vector']
    )
    

    Script:

    col_arr = F.split('a_vector', '\s*;\s*')
    max_size = df.agg(F.max(F.size(col_arr))).head()[0]
    df = df.select(
        '*',
        *[col_arr[i].alias(f'col{i}') for i in range(max_size)],
        F.aggregate(col_arr, F.lit(0.0), lambda acc, x: acc + x).alias('sum')
    )
    
    df.show()
    # +---+-------------+----+----+----+------+
    # |  a|     a_vector|col0|col1|col2|   sum|
    # +---+-------------+----+----+----+------+
    # |  1|300;-200;2022| 300|-200|2022|2122.0|
    # |  2|3  ; 2  ;   1|   3|   2|   1|   6.0|
    # +---+-------------+----+----+----+------+
    

    Based on the comments, the following is probably what you need. Calculating max_size of array elements, calculating column-wise sums and sorting the results.

    col_arr = F.split('a_vector', '\s*;\s*')
    max_size = df.agg(F.max(F.size(col_arr))).head()[0]
    df = df.agg(F.array_sort(F.array([F.sum(col_arr[i]) for i in range(max_size)])).alias('sum'))
    
    df.show(truncate=0)
    # +-----------------------+
    # |sum                    |
    # +-----------------------+
    # |[-198.0, 303.0, 2023.0]|
    # +-----------------------+