Search code examples
scaladataframeapache-sparkapache-spark-dataset

spark scala convert a nested dataframe to nested dataset


I have a nested dataframe "inputFlowRecordsAgg" which have follwoing schema

root
 |-- FlowI.key: string (nullable = true)
 |-- FlowS.minFlowTime: long (nullable = true)
 |-- FlowS.maxFlowTime: long (nullable = true)
 |-- FlowS.flowStartedCount: long (nullable = true)
 |-- FlowI.DestPort: integer (nullable = true)
 |-- FlowI.SrcIP: struct (nullable = true)
 |    |-- bytes: binary (nullable = true)
 |-- FlowI.DestIP: struct (nullable = true)
 |    |-- bytes: binary (nullable = true)
 |-- FlowI.L4Protocol: byte (nullable = true)
 |-- FlowI.Direction: byte (nullable = true)
 |-- FlowI.Status: byte (nullable = true)
 |-- FlowI.Mac: string (nullable = true)

Wanted to convert into nested dataset of following case classes

case class InputFlowV1(val FlowI: FlowI,
                             val FlowS: FlowS)

case class FlowI(val Mac: String,
                 val SrcIP: IPAddress,
                 val DestIP: IPAddress,
                 val DestPort: Int,
                 val L4Protocol: Byte,
                 val Direction: Byte,
                 val Status: Byte,
                 var key: String = "")

case class FlowS(var minFlowTime: Long,
                          var maxFlowTime: Long,
                          var flowStartedCount: Long)

but when I try converting it using inputFlowRecordsAgg.as[InputFlowV1]

cannot resolve '`FlowI`' given input columns: [FlowI.DestIP,FlowI.Direction, FlowI.key, FlowS.maxFlowTime, FlowI.SrcIP, FlowS.flowStartedCount, FlowI.L4Protocol, FlowI.Mac, FlowI.DestPort, FlowS.minFlowTime, FlowI.Status];
org.apache.spark.sql.AnalysisException: cannot resolve '`FlowI`' given input columns: [FlowI.DestIP,FlowI.Direction, FlowI.key, FlowS.maxFlowTime, FlowI.SrcIP, FlowS.flowStartedCount, FlowI.L4Protocol, FlowI.Mac, FlowI.DestPort, FlowS.minFlowTime, FlowI.Status];
    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)

One comment asked me for a full code, here it is

def getReducedFlowR(inputFlowRecords: Dataset[InputFlowV1],
                            @transient spark: SparkSession): Dataset[InputFlowV1]={


     val inputFlowRecordsAgg = inputFlowRecords.groupBy(column("FlowI.key") as "FlowI.key")
      .agg(min("FlowS.minFlowTime") as "FlowS.minFlowTime" , max("FlowS.maxFlowTime") as "FlowS.maxFlowTime",
        sum("FlowS.flowStartedCount") as "FlowS.flowStartedCount" 
        , first("FlowI.Mac") as "FlowI.Mac"
        , first("FlowI.SrcIP") as "FlowI.SrcIP" , first("FlowI.DestIP") as "FlowI.DestIP"
        ,first("FlowI.DestPort") as "FlowI.DestPort"
        , first("FlowI.L4Protocol") as "FlowI.L4Protocol"
        , first("FlowI.Direction") as "FlowI.Direction" , first("FlowI.Status") as "FlowI.Status")

        inputFlowRecordsAgg.printSchema()

        return inputFlowRecordsAgg.as[InputFlowV1]

        }

Solution

  • Reason is your case class schema has not matched with actual data schema, Please check the case class schema below. try to match your case class schema to data schema it will work.

    Your case class schema is :

    scala> df.printSchema
    root
     |-- FlowI: struct (nullable = true)
     |    |-- Mac: string (nullable = true)
     |    |-- SrcIP: string (nullable = true)
     |    |-- DestIP: string (nullable = true)
     |    |-- DestPort: integer (nullable = false)
     |    |-- L4Protocol: byte (nullable = false)
     |    |-- Direction: byte (nullable = false)
     |    |-- Status: byte (nullable = false)
     |    |-- key: string (nullable = true)
     |-- FlowS: struct (nullable = true)
     |    |-- minFlowTime: long (nullable = false)
     |    |-- maxFlowTime: long (nullable = false)
     |    |-- flowStartedCount: long (nullable = false)
    
    

    Try to change your code like below it should work now.

    val inputFlowRecordsAgg = inputFlowRecords.groupBy(column("FlowI.key") as "key")
          .agg(min("FlowS.minFlowTime") as "minFlowTime" , max("FlowS.maxFlowTime") as "maxFlowTime",
            sum("FlowS.flowStartedCount") as "flowStartedCount" 
            , first("FlowI.Mac") as "Mac"
            , first("FlowI.SrcIP") as "SrcIP" , first("DestIP") as "DestIP"
            ,first("FlowI.DestPort") as "DestPort"
            , first("FlowI.L4Protocol") as "L4Protocol"
            , first("FlowI.Direction") as "Direction" , first("FlowI.Status") as "Status")
            .select(struct($"key",$"Mac",$"SrcIP",$"DestIP",$"DestPort",$"L4Protocol",$"Direction",$"Status").as("FlowI"),struct($"flowStartedCount",$"minFlowTime",$"maxFlowTime").as("FlowS")) // add this line & change based on your columns .. i have added roughly..:)