Search code examples
pysparkconcatenationconcat-ws

Pyspark - Concatenate DataFrame values only for specific columns listed in one of the columns of the same dataframe


I'm struggling with this problem: There is a dataframe with couple of columns. One of this columns is a "BKColumns" column of ARRAY type which contains some of columns' names (not all) of the same dataframe. In every row of the dataframe we can have different values of column names in "BKColumns".At the very end, I would like to add another column to the dataframe which contains concatenation of values from columns mentioned in "BKColumns".

enter image description here

I will describe it also here on the simple example: enter image description here

Unfortunately none of the below commented lines were working. It throws the error that the column is not iterable or some other errors. enter image description here


Solution

  • Since this is a dynamic problem it can't be solved by Spark's own tools. It means that python udfs should be used.

    from pyspark import functions as F
    
    df = spark_session.createDataFrame([
        (1, 2, 3, ['c1', 'c2', 'c3']),
        (3, 4, 5, ['c1', 'c2']),
        (6, 7, 8, ['c2']),
        (9, 10, 11, ['c1', 'c3'])
    ], ['c1', 'c2', 'c3', 'BKColumns'])
    
    df.show()
    
    # +---+---+---+------------+
    # | c1| c2| c3|   BKColumns|
    # +---+---+---+------------+
    # |  1|  2|  3|[c1, c2, c3]|
    # |  3|  4|  5|    [c1, c2]|
    # |  6|  7|  8|        [c2]|
    # |  9| 10| 11|    [c1, c3]|
    # +---+---+---+------------+
    
    @F.udf
    def udf(c1, c2, c3, bk_columns):
        d = {
            'c1': c1,
            'c2': c2,
            'c3': c3
        }
        return '_'.join(str(d[x]) for x in bk_columns if x in d)
    
    df = df.withColumn('result', udf(F.col('c1'), F.col('c2'), F.col('c3'), F.col('BKColumns')))
    
    df.show()
    
    # +---+---+---+------------+------+
    # | c1| c2| c3|   BKColumns|result|
    # +---+---+---+------------+------+
    # |  1|  2|  3|[c1, c2, c3]| 1_2_3|
    # |  3|  4|  5|    [c1, c2]|   3_4|
    # |  6|  7|  8|        [c2]|     7|
    # |  9| 10| 11|    [c1, c3]|  9_11|
    # +---+---+---+------------+------+
    

    But it's still not very dynamic - we need to know the columns' names to write the udf -> it can't be applied to any df.