Search code examples
pythonapache-sparkpysparkapache-spark-sqlhierarchy

Reverse the hierarchy order in PySpark


I have to create a data frame which would contain the hierarchy of employees. The first 3 columns "Name", "Position", "Age" should stay as is, but the other columns should be reversed according to the hierarchy.

In the input, Raj is reporting to "Pos1" which is reporting "Pos2" and "Pos3" and so on. We can have positions up to "Pos30". If an employee has just 2 levels of hierarchy, there are null values in the subsequent columns.

For the output, we need to reverse the hierarchy: the value of Pos3 should come in Pos1 and same for its respective columns.

1


Solution

  • This is a complex, but efficient approach for your 30 sets of columns:

    • Spark 3.1+

      cols = ['Pos', 'Brid', 'Emid']
      groups = [F.array(df.colRegex(fr"`({'|'.join(cols)}){i}$`")) for i in range(30, 0, -1)]
      filtered = F.filter(F.array(*groups), lambda x: F.exists(x, lambda y: ~(F.isnull(y) | (y == F.lit("")))))
      df = df.select(
          "Name", "Position", "Age",
          *[filtered[x][i].alias(f"{c}{x+1}") for x in range(30) for i, c in enumerate(cols)],
          F.size(filtered).alias("Hierarchy_level")
      )
      
    • Spark 2.4+

      cols = ['Pos', 'Brid', 'Emid']
      groups = [F.array(df.colRegex(fr"`({'|'.join(cols)}){i}$`")) for i in range(30, 0, -1)]
      df = df.withColumn('_temp', F.array(*groups))
      filtered = F.expr("filter(_temp, x -> exists(x, y -> !(isnull(y) or (y = ''))))")
      df = df.select(
          "Name", "Position", "Age",
          *[filtered[x][i].alias(f"{c}{x+1}") for x in range(30) for i, c in enumerate(cols)],
          F.size(filtered).alias("Hierarchy_level")
      )
      

    Testing with only 3 sets of columns.

    Input dataframe:

    from pyspark.sql import functions as F
    df = spark.createDataFrame(
        [("Raj", "Trainee", 30, "Associate", "G104", "100675284", "Consultant", "G105", "10078696", "Sr Consultant", "G106", "1837839"),
         ("Ken", "Associate", 31, "Consultant", "G105", "10078696", "Sr Consultant", "G106", "1837839", "", None, None)],
        ["Name", "Position", "Age", "Pos1", "Brid1", "Emid1", "Pos2", "Brid2", "Emid2", "Pos3", "Brid3", "Emid3"])
    df.show()
    # +----+---------+---+----------+-----+---------+-------------+-----+--------+-------------+-----+-------+
    # |Name| Position|Age|      Pos1|Brid1|    Emid1|         Pos2|Brid2|   Emid2|         Pos3|Brid3|  Emid3|
    # +----+---------+---+----------+-----+---------+-------------+-----+--------+-------------+-----+-------+
    # | Raj|  Trainee| 30| Associate| G104|100675284|   Consultant| G105|10078696|Sr Consultant| G106|1837839|
    # | Ken|Associate| 31|Consultant| G105| 10078696|Sr Consultant| G106| 1837839|             | null|   null|
    # +----+---------+---+----------+-----+---------+-------------+-----+--------+-------------+-----+-------+
    

    Script and results:

    • Spark 3.1+

      cols = ['Pos', 'Brid', 'Emid']
      groups = [F.array(df.colRegex(fr"`({'|'.join(cols)}){i}$`")) for i in range(3, 0, -1)]
      filtered = F.filter(F.array(*groups), lambda x: F.exists(x, lambda y: ~(F.isnull(y) | (y == F.lit("")))))
      df = df.select(
          "Name", "Position", "Age",
          *[filtered[x][i].alias(f"{c}{x+1}") for x in range(3) for i, c in enumerate(cols)],
          F.size(filtered).alias("Hierarchy_level")
      )
      df.show()
      # +----+---------+---+-------------+-----+-------+----------+-----+--------+---------+-----+---------+---------------+
      # |Name| Position|Age|         Pos1|Brid1|  Emid1|      Pos2|Brid2|   Emid2|     Pos3|Brid3|    Emid3|Hierarchy_level|
      # +----+---------+---+-------------+-----+-------+----------+-----+--------+---------+-----+---------+---------------+
      # | Raj|  Trainee| 30|Sr Consultant| G106|1837839|Consultant| G105|10078696|Associate| G104|100675284|              3|
      # | Ken|Associate| 31|Sr Consultant| G106|1837839|Consultant| G105|10078696|     null| null|     null|              2|
      # +----+---------+---+-------------+-----+-------+----------+-----+--------+---------+-----+---------+---------------+
      
    • Spark 2.4+

      cols = ['Pos', 'Brid', 'Emid']
      groups = [F.array(df.colRegex(fr"`({'|'.join(cols)}){i}$`")) for i in range(3, 0, -1)]
      df = df.withColumn('_temp', F.array(*groups))
      filtered = F.expr("filter(_temp, x -> exists(x, y -> !(isnull(y) or (y = ''))))")
      df = df.select(
          "Name", "Position", "Age",
          *[filtered[x][i].alias(f"{c}{x+1}") for x in range(3) for i, c in enumerate(cols)],
          F.size(filtered).alias("Hierarchy_level")
      )
      

    Proof of efficiency:

    df.explain()
    # == Physical Plan ==
    # Project [Name#3697, Position#3698, Age#3699L, filter(array(array(Pos3#3706, Brid3#3707, Emid3#3708), array(Pos2#3703, Brid2#3704, Emid2#3705), array(Pos1#3700, Brid1#3701, Emid1#3702)), lambdafunction(exists(lambda x_29#3780, lambdafunction((isnotnull(lambda x_30#3790) AND NOT (lambda x_30#3790 = )), lambda x_30#3790, false)), lambda x_29#3780, false))[0][0] AS Pos1#3770, filter(array(array(Pos3#3706, Brid3#3707, Emid3#3708), array(Pos2#3703, Brid2#3704, Emid2#3705), array(Pos1#3700, Brid1#3701, Emid1#3702)), lambdafunction(exists(lambda x_29#3781, lambdafunction((isnotnull(lambda x_30#3791) AND NOT (lambda x_30#3791 = )), lambda x_30#3791, false)), lambda x_29#3781, false))[0][1] AS Brid1#3771, filter(array(array(Pos3#3706, Brid3#3707, Emid3#3708), array(Pos2#3703, Brid2#3704, Emid2#3705), array(Pos1#3700, Brid1#3701, Emid1#3702)), lambdafunction(exists(lambda x_29#3782, lambdafunction((isnotnull(lambda x_30#3792) AND NOT (lambda x_30#3792 = )), lambda x_30#3792, false)), lambda x_29#3782, false))[0][2] AS Emid1#3772, filter(array(array(Pos3#3706, Brid3#3707, Emid3#3708), array(Pos2#3703, Brid2#3704, Emid2#3705), array(Pos1#3700, Brid1#3701, Emid1#3702)), lambdafunction(exists(lambda x_29#3783, lambdafunction((isnotnull(lambda x_30#3793) AND NOT (lambda x_30#3793 = )), lambda x_30#3793, false)), lambda x_29#3783, false))[1][0] AS Pos2#3773, filter(array(array(Pos3#3706, Brid3#3707, Emid3#3708), array(Pos2#3703, Brid2#3704, Emid2#3705), array(Pos1#3700, Brid1#3701, Emid1#3702)), lambdafunction(exists(lambda x_29#3784, lambdafunction((isnotnull(lambda x_30#3794) AND NOT (lambda x_30#3794 = )), lambda x_30#3794, false)), lambda x_29#3784, false))[1][1] AS Brid2#3774, filter(array(array(Pos3#3706, Brid3#3707, Emid3#3708), array(Pos2#3703, Brid2#3704, Emid2#3705), array(Pos1#3700, Brid1#3701, Emid1#3702)), lambdafunction(exists(lambda x_29#3785, lambdafunction((isnotnull(lambda x_30#3795) AND NOT (lambda x_30#3795 = )), lambda x_30#3795, false)), lambda x_29#3785, false))[1][2] AS Emid2#3775, filter(array(array(Pos3#3706, Brid3#3707, Emid3#3708), array(Pos2#3703, Brid2#3704, Emid2#3705), array(Pos1#3700, Brid1#3701, Emid1#3702)), lambdafunction(exists(lambda x_29#3786, lambdafunction((isnotnull(lambda x_30#3796) AND NOT (lambda x_30#3796 = )), lambda x_30#3796, false)), lambda x_29#3786, false))[2][0] AS Pos3#3776, filter(array(array(Pos3#3706, Brid3#3707, Emid3#3708), array(Pos2#3703, Brid2#3704, Emid2#3705), array(Pos1#3700, Brid1#3701, Emid1#3702)), lambdafunction(exists(lambda x_29#3787, lambdafunction((isnotnull(lambda x_30#3797) AND NOT (lambda x_30#3797 = )), lambda x_30#3797, false)), lambda x_29#3787, false))[2][1] AS Brid3#3777, filter(array(array(Pos3#3706, Brid3#3707, Emid3#3708), array(Pos2#3703, Brid2#3704, Emid2#3705), array(Pos1#3700, Brid1#3701, Emid1#3702)), lambdafunction(exists(lambda x_29#3788, lambdafunction((isnotnull(lambda x_30#3798) AND NOT (lambda x_30#3798 = )), lambda x_30#3798, false)), lambda x_29#3788, false))[2][2] AS Emid3#3778, size(filter(array(array(Pos3#3706, Brid3#3707, Emid3#3708), array(Pos2#3703, Brid2#3704, Emid2#3705), array(Pos1#3700, Brid1#3701, Emid1#3702)), lambdafunction(exists(lambda x_29#3789, lambdafunction((isnotnull(lambda x_30#3799) AND NOT (lambda x_30#3799 = )), lambda x_30#3799, false)), lambda x_29#3789, false)), true) AS Hierarchy_level#3779]
    # +- *(1) Scan ExistingRDD[Name#3697,Position#3698,Age#3699L,Pos1#3700,Brid1#3701,Emid1#3702,Pos2#3703,Brid2#3704,Emid2#3705,Pos3#3706,Brid3#3707,Emid3#3708]