Search code examples
scalaapache-sparkapache-zeppelin

Convert Json WrappedArray to String using spark sql


I'm working on a zeppelin notebook and try to load data from a table using sql. In the table, each row has one column which is a JSON blob. For example, [{'timestamp':12345,'value':10},{'timestamp':12346,'value':11},{'timestamp':12347,'value':12}]

I want to select the JSON blob as a string, like the original string. But spark automatically load it as a WrappedArray.

It seems that I have to write a UDF to convert the WrappedArray to a string. The following is my code.

I first define a Scala function and then register the function. And then use the registered function on the column.

val unwraparr = udf ((x: WrappedArray[(Int, Int)]) => x.map { case Row(val1: String) =>  + "," + val2 })
sqlContext.udf.register("fwa", unwraparr)

It doesn't work. I would really appreciate if anyone can help.

The following is the schema of the part I'm working on. There will be many amount and timeStamp pairs.

-- targetColumn: array (nullable = true)
    |-- element: struct (containsNull = true)
    |    |-- value: long (nullable = true)
    |    |-- timeStamp: string (nullable = true)

UPDATE: I come up with the following code:

val f = (x: Seq[Row]) => x.map { case Row(val1: Long, val2: String) => x.mkString("+") }

I need it to concat the objects/struct/row (not sure how to call the struct) to a single string.


Solution

  • If your loaded data as dataframe/dataset in spark is as below with schema as

    +------------------------------------+
    |targetColumn                        |
    +------------------------------------+
    |[[12345,10], [12346,11], [12347,12]]|
    |[[12345,10], [12346,11], [12347,12]]|
    +------------------------------------+
    
    root
     |-- targetColumn: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- timeStamp: string (nullable = true)
     |    |    |-- value: long (nullable = true)
    

    Then you can write the dataframe as json to a temporary json file and read it as text file and parse the String line and convert it to dataframe as below (/home/testing/test.json is the temporary json file location)

    df.write.mode(SaveMode.Overwrite).json("/home/testing/test.json")
    
    val data = sc.textFile("/home/testing/test.json")
    
    val rowRdd = data.map(jsonLine => Row(jsonLine.split(":\\[")(1).replace("]}", "")))
    val stringDF = sqlContext.createDataFrame(rowRdd, StructType(Array(StructField("targetColumn", StringType, true))))
    

    Which should leave you with following dataframe and schema

    +--------------------------------------------------------------------------------------------------+
    |targetColumn                                                                                      |
    +--------------------------------------------------------------------------------------------------+
    |{"timeStamp":"12345","value":10},{"timeStamp":"12346","value":11},{"timeStamp":"12347","value":12}|
    |{"timeStamp":"12345","value":10},{"timeStamp":"12346","value":11},{"timeStamp":"12347","value":12}|
    +--------------------------------------------------------------------------------------------------+
    
    root
     |-- targetColumn: string (nullable = true)
    

    I hope the answer is helpful

    read initially as text not dataframe

    You can use my second phase of answer i.e. reading from json file and parsing, into your first phase of getting dataframe.