Search code examples
scalaapache-sparkdatabricksspark-structured-streamingazure-eventhub

Parsing Event Hub Complex Array Type messages using spark streaming


I need to parse the Array Type in body while reading from eventhub. we have nested json message but not able to parse the same:

{"Name": "Rohit","Salary": "29292","EmpID": 12,"Projects": [{"ProjectID": "9191","ProjectName": "abc","Duration": "79"},{"ProjectID": "9192","ProjectName": "xyz","Duration": "75"}]}

I am trying to modify the schema using below: but seems there is some issue.

val testSchema = new StructType()
  .add("Name", StringType)
  .add("Salary", StringType)
  .add("EmpID", StringType)      
  .add("Projects", new ArrayType(new StructType()
    .add("ProjectID", StringType)
    .add("ProjectName", StringType)
    .add("Duration", StringType)))

Any help will be really appreciated.


Solution

  • Without exact error it's hard to say, but it looks like that you have an error in schema definition - you need to modify schema to add flag specifying if array elements could be nullable or not (see the true flag after nested StructType). Following schema works fine when reading JSON directly or trasforming it from string:

    val testSchema = new StructType()
      .add("Name", StringType)
      .add("Salary", StringType)
      .add("EmpID", StringType)      
      .add("Projects", new ArrayType(new StructType()
        .add("ProjectID", StringType)
        .add("ProjectName", StringType)
        .add("Duration", StringType), true))
    
    scala> spark.read.schema(testSchema).json("file.json").show(truncate=false)
    +-----+------+-----+----------------------------------+
    |Name |Salary|EmpID|Projects                          |
    +-----+------+-----+----------------------------------+
    |Rohit|29292 |12   |[{9191, abc, 79}, {9192, xyz, 75}]|
    +-----+------+-----+----------------------------------+
    
    scala> import org.apache.spark.sql.functions._
    import org.apache.spark.sql.functions._
    
    scala> val df = spark.read.text("file.json")
    df: org.apache.spark.sql.DataFrame = [value: string]
    
    scala> df.select(from_json($"value", testSchema)).show(truncate=false)
    +------------------------------------------------------+
    |from_json(value)                                      |
    +------------------------------------------------------+
    |{Rohit, 29292, 12, [{9191, abc, 79}, {9192, xyz, 75}]}|
    +------------------------------------------------------+