Search code examples
apache-sparkpysparkapache-kafkaapache-spark-sqlspark-structured-streaming

Pyspark structured streaming - Union data from 2 nested JSON


I have 2 kafka streaming dataframes. The spark schema looks like this:

root
 |-- key: string (nullable = true)
 |-- pmudata1: struct (nullable = true)
 |    |-- pmu_id: byte (nullable = true)
 |    |-- time: timestamp (nullable = true)
 |    |-- stream_id: byte (nullable = true)
 |    |-- stat: string (nullable = true)

and

root
 |-- key: string (nullable = true)
 |-- pmudata2: struct (nullable = true)
 |    |-- pmu_id: byte (nullable = true)
 |    |-- time: timestamp (nullable = true)
 |    |-- stream_id: byte (nullable = true)
 |    |-- stat: string (nullable = true)

How can I union all rows from both streams as they come by specific batch window? Positions of columns in both streams is same. Each stream have different pmu_id value so I can differentiate records per that value.

UnionByName or union produces stream from single dataframe.

I would need to explode column names I guess, something like this but this is for scala. Is there a way to automatically explode whole JSON in columns and union them?


Solution

  • You can use explode function only with array and map types. In your case, the column pmudata2 has type StructType so simply use star * to select all sub-fields like this:

    df1 = df.selectExpr("key", "pmudata2.*")
    
    #root
    #|-- key: string (nullable = true)
    #|-- pmu_id: byte (nullable = true)
    #|-- time: timestamp (nullable = true)
    #|-- stream_id: byte (nullable = true)
    #|-- stat: string (nullable = true)