I'm working on a zeppelin notebook and try to load data from a table using sql.
In the table, each row has one column which is a JSON blob. For example, [{'timestamp':12345,'value':10},{'timestamp':12346,'value':11},{'timestamp':12347,'value':12}]
I want to select the JSON blob as a string, like the original string. But spark automatically load it as a WrappedArray.
It seems that I have to write a UDF to convert the WrappedArray to a string. The following is my code.
I first define a Scala function and then register the function. And then use the registered function on the column.
val unwraparr = udf ((x: WrappedArray[(Int, Int)]) => x.map { case Row(val1: String) => + "," + val2 })
sqlContext.udf.register("fwa", unwraparr)
It doesn't work. I would really appreciate if anyone can help.
The following is the schema of the part I'm working on. There will be many amount and timeStamp pairs.
-- targetColumn: array (nullable = true)
|-- element: struct (containsNull = true)
| |-- value: long (nullable = true)
| |-- timeStamp: string (nullable = true)
UPDATE: I come up with the following code:
val f = (x: Seq[Row]) => x.map { case Row(val1: Long, val2: String) => x.mkString("+") }
I need it to concat the objects/struct/row (not sure how to call the struct) to a single string.
If your loaded data as dataframe
/dataset
in spark
is as below with schema
as
+------------------------------------+
|targetColumn |
+------------------------------------+
|[[12345,10], [12346,11], [12347,12]]|
|[[12345,10], [12346,11], [12347,12]]|
+------------------------------------+
root
|-- targetColumn: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- timeStamp: string (nullable = true)
| | |-- value: long (nullable = true)
Then you can write the dataframe
as json
to a temporary json file and read it as text file and parse the String line
and convert it to dataframe
as below (/home/testing/test.json
is the temporary json
file location)
df.write.mode(SaveMode.Overwrite).json("/home/testing/test.json")
val data = sc.textFile("/home/testing/test.json")
val rowRdd = data.map(jsonLine => Row(jsonLine.split(":\\[")(1).replace("]}", "")))
val stringDF = sqlContext.createDataFrame(rowRdd, StructType(Array(StructField("targetColumn", StringType, true))))
Which should leave you with following dataframe
and schema
+--------------------------------------------------------------------------------------------------+
|targetColumn |
+--------------------------------------------------------------------------------------------------+
|{"timeStamp":"12345","value":10},{"timeStamp":"12346","value":11},{"timeStamp":"12347","value":12}|
|{"timeStamp":"12345","value":10},{"timeStamp":"12346","value":11},{"timeStamp":"12347","value":12}|
+--------------------------------------------------------------------------------------------------+
root
|-- targetColumn: string (nullable = true)
I hope the answer is helpful
read initially as text not dataframe
You can use my second phase of answer i.e. reading from json file and parsing, into your first phase of getting dataframe.