Search code examples
javascalaapache-sparkapache-spark-sqlhadoop2

I want the parquet file data to be into the below string format.Can some one help me?


i am trying to get the data in the below format. Can some one help me to get a UDF for spark and scala. I am new to this. Can some one help with this? The output i am expecting is string output

Java|XX||Scala|XA

If it has a continuation value in the array, it should continue Please help me, very important task

root
 |-- name: string (nullable = true)
 |-- booksIntersted: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- author: string (nullable = true)
 |    |    


+----------+-----------------------------------+
|name      |booksIntersted                     |
+----------+-----------------------------------+
|James     |[[Java, XX], [Scala, XA]]|
|Michael   |[[Java, XY], [Scala, XB]]|
|Robert    |[[Java, XZ], [Scala, XC]]|
|Washington|null                               |
+----------+-----------------------------------+
```

Solution

  • Check below code.

    val finalDF = df
    .withColumn(
        "booksIntersted",
        when(
            size($"booksIntersted") > 0,
            expr("concat_ws('||',transform(booksIntersted,x -> concat(x.name,'|',x.author)))")
        )
    )
    

    finalDF.printSchema

    root
     |-- name: string (nullable = true)
     |-- booksIntersted: string (nullable = true)
    

    finalDF.show(false)

    +----------+-----------------+
    |name      |booksIntersted   |
    +----------+-----------------+
    |James     |Java|XX||Scala|XA|
    |Michael   |Java|XY||Scala|XB|
    |Robert    |Java|XZ||Scala|XC|
    |Washington|null             |
    +----------+-----------------+
    

    Writing data in csv format

    finalDF
    .repartition(1)
    .write
    .format("csv")
    .option("header","true")
    .save("/tmp/csv/data")
    
    cd /tmp/csv/data
    > cat part-00000-c8527721-5b25-4689-bfe4-028ac2873def-c000.csv
    name,booksIntersted
    James,Java|XX||Scala|XA
    Michael,Java|XY||Scala|XB
    Robert,Java|XZ||Scala|XC
    Washington,""
    

    Using udf

    scala> val combine = udf((row: Seq[Row]) => {
        row
        .map(r => r.getAs[String]("name") + "|" + r.getAs[String]("author"))
        .reduce(_+ "||" + _)
    })
    
    scala> df
    .withColumn(
        "booksInterstedNew",
        when(
            size($"booksIntersted") > 0,
            combine($"booksIntersted")
        )
    )
    .show(false)
    
    +----------+-------------------------+-----------------+
    |name      |booksIntersted           |booksInterstedNew|
    +----------+-------------------------+-----------------+
    |James     |[[Java, XX], [Scala, XA]]|Java|XX||Scala|XA|
    |Michael   |[[Java, XY], [Scala, XB]]|Java|XY||Scala|XB|
    |Robert    |[[Java, XZ], [Scala, XC]]|Java|XZ||Scala|XC|
    |Washington|[]                       |null             |
    +----------+-------------------------+-----------------+