Search code examples
scalaapache-spark

Get field from Apache Spark Row which is Wrapped array as a Seq into a List using Scala


Background

  • Fetching data in json format from delta table
  • Using Apache Spark and Scala

DATA FORMAT

  val factories = """
      {
        "cities": {
          "name": "Sao Paulo"
          "areas": [
            {
              "code": "41939",
              "type": "downtown"
            },
            {
              "code": "48294",
              "type": "residential"
            }
          ],
        },
       
        "domains": [
            {
               "id": "19sk2nfb",
               "name" : "defense"
            }
        ]
    }

CODE

This fetches the data from the delta table and creates case class objects

fetchedData is DataFrame fetched using some criteria

factoriesSchema is json schema

val structuredData =
    fetchedData.withColumn(
      "StructuredFactoryJson",
      from_json(col("FactoryData"), factoriesSchema)
  )

val factories = structuredData.collect().map { row =>
      val structJson = row.getAs[Row]("StructuredFactoryJson")
      val citiesRow = structJson.getAs[Row]("cities")
      val city = City(
        citiesRow.getAs[String]("name"),
        citiesRow
          .getAs[Seq[Row]]("areas")
          .map(areaRow =>
            Area(
              area.getAs[String]("type"),
              area.getAs[String]("code")
            )
          )
      )
      val domains = structJson
        .getAs[Seq[Row]]("domains")
        .map( area ->
           Area( area.getAs
             .
             .
             .

    }


Problem

This works fine and Seq is obtained. But the issues is that, if there is any way to get List instead of Seq and construct the bigger object as is


Solution

  • After digging around a bit I found 2 ways to accomplish this

    1. Using JavaConverters

    This approach was first discovered in an attempt to get a List from the instead of Seq. But it was observed that the List returned is a Java List. Hence it was needed to convert it to Scala List

    import scala.collection.JavaConverters._
    
    val factories = structuredData.collect().map { row =>
          val structJson = row.getAs[Row]("StructuredFactoryJson")
          val citiesRow = structJson.getAs[Row]("cities")
          val city = City(
            citiesRow.getAs[String]("name"),
            citiesRow
              .getList(citiesRow.fieldIndex("areas"))
              .asScala
              .map((areaRow : Row) =>
                Area(
                  areaRow.getAs[String]("type"),
                  areaRow.getAs[String]("code")
                )
              ).toList
          )
          val domains = structJson
            .getList(structJson.fieldIndex("domains"))
            .asScala
            .map((area : Row) ->
               Area( area.getAs
                 .
                 .
                 .
           ).toList
        }
    

    Issues

    • Version and package : The details about JavaConverters version and packages can be found in this SO Question

    • There can be performance issues while converting lists

    1. Without JavaConverters

    Again doing some more search I found the more descent approach and settled with this one

    val factories = structuredData.collect().map { row =>
          val structJson = row.getAs[Row]("StructuredFactoryJson")
          val citiesRow = structJson.getAs[Row]("cities")
          val city = City(
            citiesRow.getAs[String]("name"),
            citiesRow
              .getSeq[Row](citiesRow.fieldIndex("areas"))
              .map((areaRow : Row) =>
                Area(
                  areaRow.getAs[String]("type"),
                  areaRow.getAs[String]("code")
                )
              ).toList
          )
          val domains = structJson
            .getSeq[Row](structJson.fieldIndex("domains"))
            .asScala
            .map((area : Row) ->
               Area( area.getAs
                 .
                 .
                 .
           ).toList
        }
    

    This resolved the issue