Search code examples
scalaapache-sparkrecursionparquet

Create new column in dataframe with udf and recursion


I have two parquet files one describe files with inode number, one describe the inode name and the parent inode, I need to reconstruct fullpath from the second file.

My table with inode's description is named idirs_table_read and is formated like this (this is a full example):

iparent,iname,ichild
93767723,folder12,40715069
65688175,level4,93767723
80373386,name,65688175
22746413,level2,80373386
24,base,22746413

I want with a inode number to be able to reconstruct a file path.
For example for inode 93767723, the path is : /base/level2/name/level4
I have define two functions (one recursive, the other procedural) those two functions works when used like this newPathRecursive(1236549) but fails when use in withColumns:

def newPathRecursive( inumber : Int ):String = {
    var composite = ""
    var result = idirs_table_read.select("iparent", "iname").filter($"ichild"===inumber)

    if ( (result.count() == 1) && (result.first()(0) !=  inumber) )   {
       var num= result.first()(0).asInstanceOf[Int]
       composite=  newPath(num) + "/"  + result.first()(1).asInstanceOf[String] 
    }
    return composite
}


def newPathProcedurale  (inumber : Int ):String =  {

    var composite = ""
    var go = true
    var parentInode=inumber
    while(go){

        var result = idirs_table_read.select("iparent", "iname").filter($"ichild"===parentInode)

        if ( (result.count() == 1) && (result.first()(0) !=  inumber) )   {
            println(result.first()(0)+","+ result.first()(1)+","+ parentInode)
            parentInode = result.first()(0).asInstanceOf[Int]
            composite=  "/"  + result.first()(1).asInstanceOf[String] + composite
        }else{
            go=false
        }
    }
    return composite.asInstanceOf[String]
}
val buildpath2 = udf[String, Int](newPath2)
val buildpath = udf[String, Int](newPath)

My goal is to be able to replace the inode number in the other table by this path, but when I try to use the function in a select I get something like that:

 df.withColumn("newcol",buildpath ( $"inumber" )
 Caused by: java.lang.reflect.InvocationTargetException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1521.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1521.0 (TID 14859, ip.ip.ip.ip, executor 1): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (int) => string)

Could you please help me with this code and eventually advise me better implementation of this algorithm and better use for it ?
My goal is simply to build a new parquet file from those two with complete path instead of inodes (which is not human readable)


Solution

  • As the post mention as duplicate is in fact a different duplicate as mine and I didn't understand the explanation because the real answer is in a comment not in the actual answe, here's the answer:

    What I want to do is not possible if the dataset used inside the udf is not local:

    inodes_table_read.isLocal
    false
    

    So I'm going to use something else and I'll post the explanation.