Search code examples
pythonapache-sparkpysparkmultiple-columnsrename

How rename specific columns in PySpark?


I have a dataframe in PySpark, result of a groupBy with agg. Like this:

df1 = df.groupBy(['data', 'id']).pivot('type').agg(F.sum('value').alias("Values"), F.count('value').alias("Quantity"))

But I need to put the alias ("Values" and "Quantity) as the prefix of these columns, not as the suffix.

This is the example of dataframe.

Result of my script:

data id some_type_Values some_type_Quantity
2022-01-01 1234 12.50 2

Desire output:

data id Values some_type Quantity some_type
2022-01-01 1234 12.50 2

What I've tried so far:

selected = df1.select([s for s in df1.columns if 'Values' in s])
select_volume = [col(col_name).alias("Values " + col_name)  for col_name in selected.columns]
df2 = df1.select(*select_volume)

This works, but splits my dataframe. And I will also need to cut the _Values and _Quantity at the end of the column.

How can I rename selected columns for each operation and also remove this alias from the end of each one?


Solution

  • Python's rfind may be useful.

    Example dataframes:

    from pyspark.sql import functions as F
    df = spark.createDataFrame(
        [('2022-01-01', 1234, 'some_type_1', 2),
         ('2022-01-01', 1234, 'some_type_2', 3)],
        ['data', 'id', 'type', 'value'])
    df1 = df.groupBy(['data', 'id']).pivot('type').agg(F.sum('value').alias("Values"), F.count('value').alias("Quantity"))
    df1.show()
    # +----------+----+------------------+--------------------+------------------+--------------------+
    # |      data|  id|some_type_1_Values|some_type_1_Quantity|some_type_2_Values|some_type_2_Quantity|
    # +----------+----+------------------+--------------------+------------------+--------------------+
    # |2022-01-01|1234|                 2|                   1|                 3|                   1|
    # +----------+----+------------------+--------------------+------------------+--------------------+
    

    Script for renaming:

    df1 = df1.select(
        *['data', 'id'],
        *[F.col(c).alias(f"{c[c.rfind('_')+1:]} {c[:c.rfind('_')]}") for c in df1.columns if c not in ['data', 'id']]
    )
    df1.show()
    # +----------+----+------------------+--------------------+------------------+--------------------+
    # |      data|  id|Values some_type_1|Quantity some_type_1|Values some_type_2|Quantity some_type_2|
    # +----------+----+------------------+--------------------+------------------+--------------------+
    # |2022-01-01|1234|                 2|                   1|                 3|                   1|
    # +----------+----+------------------+--------------------+------------------+--------------------+
    

    toDF is also possible and it's less verbose, but it can be more prone to error in some cases.

    df1 = df1.toDF(
        *['data', 'id'],
        *[f"{c[c.rfind('_')+1:]} {c[:c.rfind('_')]}" for c in df1.columns if c not in ['data', 'id']]
    )
    df1.show()
    # +----------+----+------------------+--------------------+------------------+--------------------+
    # |      data|  id|Values some_type_1|Quantity some_type_1|Values some_type_2|Quantity some_type_2|
    # +----------+----+------------------+--------------------+------------------+--------------------+
    # |2022-01-01|1234|                 2|                   1|                 3|                   1|
    # +----------+----+------------------+--------------------+------------------+--------------------+