Search code examples
scalaapache-spark

how to generate the merged string consisting of values from JSON in scala?


Input Data in a json file:

[{
    "orders": {
        "order_id": {
            "path": "orderid",
            "type": "string"
        },
        "customer_id": {
            "path": "customers.customerId",
            "type": "string"
        },
        "offer_id": {
            "path": "Offers.Offerid",
            "type": "string"
        }
    },
    "products": {
        "product_id": {
            "path": "product_id",
            "type": "string"
        },
        "product_name": {
            "path": "products.productname",
            "type": "string"
        }
    }
}]

For Orders: I am trying to generate concated string with values from path,type and columnkey using which i want to execute in a spark sql statement.

cast(orderid as string) as order_id -- orderid from from path key,string comes from type and order_id comes orders.order_id key

output
cast(orderid as string) as order_id,cast(customers.customerId as string) as customer_id,cast(Offers.Offerid as string) as offer_id

Solution

  • Check below code.

    df.show(false)
    +-----------------------------------------------------------------------------+------------------------------------------------------+
    |orders                                                                       |products                                              |
    +-----------------------------------------------------------------------------+------------------------------------------------------+
    |{{customers.customerId, string}, {Offers.Offerid, string}, {orderid, string}}|{{product_id, string}, {products.productname, string}}|
    +-----------------------------------------------------------------------------+------------------------------------------------------+
    
    val columName = "Orders".toLowerCase
    
    val outputExpr = s"""
        concat_ws(
            ', ',
            transform(
                map_entries(
                    ${columName}),
                    e -> CONCAT('CAST( ',e.value.path, ' AS ',e.value.type,' )',' AS ',e.key)
                )
            ) AS output
        """
    
    val schema = "map<string, struct<path:string,type:string>>"
    
    df
    .withColumn(
      columName,
      expr(
       s"""
         from_json(
            to_json(${columName}),
            '${schema}'
         )
        """
      )
    )
    .selectExpr(outputExpr)
    .show(false)
    
    +------------------------------------------------------------------------------------------------------------------------------------------+
    |output                                                                                                                                    |
    +------------------------------------------------------------------------------------------------------------------------------------------+
    |CAST( customers.customerId AS string ) AS customer_id, CAST( Offers.Offerid AS string ) AS offer_id, CAST( orderid AS string ) AS order_id|
    +------------------------------------------------------------------------------------------------------------------------------------------+