Search code examples
apache-sparkapache-spark-sqlapache-spark-datasetapache-spark-2.0

Spark DataFrame: find and set the main root for child


I have the following Apache Spark Dataframe:

Parent - Child
A1 - A10
A1 - A2
A2 - A3
A3 - A4
A5 - A7
A7 - A6
A8 - A9

This DataFrame displays a connection between parent and child. Logically it looks like this one: enter image description here

The main goal is setting the main root for each child. That's mean we should have the follwoing dataframe:

Parent - Child
A1 - A10
A1 - A2
A1 - A3
A1 - A4
A5 - A7
A5 - A6
A8 - A9


  • Everything should implement using Apache Spark.
  • The number of nodes is unlimited. That's mean algorithms should work regardless of the number of nodes

Solution

  • With the below approach I believe you can achieve it

    val input_rdd = spark.sparkContext.parallelize(List(("A1", "A10"), ("A1", "A2"), ("A2", "A3"), ("A3", "A4"), ("A5", "A7"), ("A7", "A6"), ("A8", "A9"), ("A4", "A11"), ("A11", "A12"), ("A6", "A13")))
    val input_df = input_rdd.toDF("Parent", "Child")
    input_df.createOrReplaceTempView("TABLE1")
    input_df.show()
    

    Input

    +------+-----+
    |Parent|Child|
    +------+-----+
    |    A1|  A10|
    |    A1|   A2|
    |    A2|   A3|
    |    A3|   A4|
    |    A5|   A7|
    |    A7|   A6|
    |    A8|   A9|
    |    A4|  A11|
    |   A11|  A12|
    |    A6|  A13|
    +------+-----+
    
    # # linkchild function to get the root    
          def linkchild(df: DataFrame): DataFrame = {
        df.createOrReplaceTempView("TEMP")
        val link_child_df = spark.sql("""select distinct a.parent, b.child from TEMP a inner join TEMP b on a.parent = b.parent or a.child = b.parent""")
        link_child_df
        }
    # # findroot function to validate and generate output
        def findroot(rdf: DataFrame) {
          val link_child_df = linkchild(rdf)
          link_child_df.createOrReplaceTempView("TEMP1")
          val cnt = spark.sql("""select * from table1 where child not in (select  child from (select * from (select distinct a.parent, b.child from TEMP1 a   inner join TEMP1 b on a.parent = b.parent or a.child = b.parent
        where a.parent not in(select distinct child from                                                                               TABLE1))))""").count()
          if (cnt == 0) {
            spark.sql("""select * from (select distinct a.parent, b.child from   TEMP1 a inner join TEMP1 b on a.parent = b.parent or a.child = b.parent
        where a.parent not in(select distinct child from TABLE1)) order by parent, child""").show
          } else {
            findroot(link_child_df)
          }
        }
    # # Calling findroot function for the first time with input_df which in turn calls linkchild function till it reaches target
        findroot(input_df)
    

    Output

    +------+-----+
    |parent|child|
    +------+-----+
    |    A1|  A10|
    |    A1|  A11|
    |    A1|  A12|
    |    A1|  A14|
    |    A1|  A15|
    |    A1|  A16|
    |    A1|  A17|
    |    A1|  A18|
    |    A1|   A2|
    |    A1|   A3|
    |    A1|   A4|
    |    A5|  A13|
    |    A5|   A6|
    |    A5|   A7|
    |    A8|   A9|
    +------+-----+