Search code examples
pythondataframeapache-sparkpysparkapache-spark-sql

How to replace column name contained in another column by that column's value using PySpark?


I have a PySpark Dataframe as below:

ID col1 col2 colA colB
id_1 %colA t < %colA int1 int3
Id_2 %colB t < %colB int2 int4

I want to replace strings starting with % with it's corresponding column value as below.

ID col1 col2
id_1 int1 t < int1
Id_2 int4 t < int4

May be if I can loop through the rows, it is possible.

But is there an efficient way to do this?


Solution

  • You have the option to transform the necessary columns into the map<string, string> data type, enabling you to subsequently use these columns for value lookups within the map, like below.

    df.withColumn(
      "mp", 
       expr("from_json(to_json(struct(*)),'map<string,string>')")
    )
    .withColumn(
      "col1", 
       expr("split(col1, '%')")
    )
    .withColumn(
      "col2", 
       expr("split(col2, '%')")
    )
    .selectExpr(
        "id", 
        "concat(col1[0],mp[col1[1]]) as col1", 
        "concat(col2[0],mp[col2[1]]) as col2", 
    ).show(false)
    
    +----+----+------+
    |id  |col1|col2  |
    +----+----+------+
    |id_1|int1|t<int1|
    |id_2|int4|t<int4|
    +----+----+------+