Search code examples
scalaapache-spark-dataset

Multiplying event case class depending on the list based on nested IDs


I am processing a dataframe and converting into Dataset[Event] using Event case class.How ever there are nested Ids for which i need to multiply the events based on the flattening of nested device:os.

I am able to return the case class Event at the Kafka event level. But not sure how to multiply events .

Kafka incoming Event:

{
  "partition": 1,
  "key": "34768_20220203_MFETP501",
  "offset": 1841543,
  "createTime": 1646041475348,
  "topic": "topic_int",
  "publishTime": 1646041475344,
  "errorCode": 0,
  "userActions": {
    "productId": "3MFETP501",
    "createdDate": "2022-02-26T11:19:35.786Z",
    "events": [
      {
        "GUID": "dbb1-f38b-f7f0-44af-90da-80179412f89c",
        "eventDate": "2022-02-26T11:19:35.786Z",
        "familyId": 2010,
        "productTypeId": 1004678,
        "serialID": "890479804",
        "productName": "MFE Total Protection 2021 Family Pack",
        "features": {
          "mapping": [
            {
              "deviceId": 999795,
              "osId": [
                100
              ]
            },
            {
              "deviceId": 987875
              "osId": [
                101
              ]
            }
          ]
        }
      }
    ]
  }
}

The expected output case classes for Event

Event("3MFETP501","1004678","2010","3MFETP501:890479804","MFE Total Protection 2021 Family Pack","999795_100", Map("targetId"->"999795_100") )
Event("3MFETP501","1004678","2010","3MFETP501:890479804","MFE Total Protection 2021 Family Pack","987875_100", Map("targetId"->"987875_100") )
case class Event(
                    productId: String,
                    familyId: String,
                    productTypeId: String,
                    key: String,
                    productName: String,
                    deviceOS:String, 
                    var featureMap: mutable.Map[String, String])




val finalDataset:Dataset[Event] = inputDataFrame.flatMap(
row=> {

        val productId = row.getAs[String]("productId")
        val userActions = row.getAs[Row]("userActions")
        val userEvents:mutable.Seq[Row] = userActions.getAs[mutable.WrappedArray[Row]]("events")

        val processedEvents:mutable.Seq[Row]= userEvents.map(
          event=> 

            val productTypeId = event.getAs[Int]("productTypeId")
            val familyId = event.getAs[String]("familyId")
            val features = activity.getAs[mutable.WrappedArray[Row]]("features")
            val serialId = activity.getAs[String]("serialId")
            val key =  productId+":"+serialId
            val features = mutable.Map[String, String]().withDefaultValue(null)
            

            val device_os_list=List("999795_100","987875_101")
            //Feature Map is for every device_os ( example "targetId"->"999795_100") for 999795_100

      if (familyId == 2010 )
    {
      val a: Option[List[String]] = flatten the deviceId,osId ..
      
      a.get.map(i=>{
          val key: String =  methodToCombinedeviceIdAndosId
          val featureMapping: mutable.Map[String, String] = getfeatureMapForInvidualKey

          Event(productId,productTypeId,familyId,key,productName,device_os,feature) ---> This is returning **List[Event]** 
        })
      }
    else{
    Event(productId,productTypeId,familyId,key,productName,device_os,feature)  --> This is returning **Event**. THIS WORKS

    }
            
        )

}

)



Solution

  • I do not implement it fully the same but I think it will be possible to understand logic and apply it on your case.

    I created json file like kafka.json and put there code like this(your event):

    [{
      "partition": 1,
      "key": "34768_20220203_MFETP501",
      "offset": 1841543,
      "createTime": 1646041475348,
      "topic": "topic_int",
      "publishTime": 1646041475344,
      "errorCode": 0,
      "userActions": {
        "productId": "3MFETP501",
        "createdDate": "2022-02-26T11:19:35.786Z",
        "events": [
          {
            "GUID": "dbb1-f38b-f7f0-44af-90da-80179412f89c",
            "eventDate": "2022-02-26T11:19:35.786Z",
            "familyId": 2010,
            "productTypeId": 1004678,
            "serialID": "890479804",
            "productName": "MFE Total Protection 2021 Family Pack",
            "features": {
              "mapping": [
                {
                  "deviceId": 999795,
                  "osId": [
                    100
                  ]
                },
                {
                  "deviceId": 987875,
                  "osId": [
                    101
                  ]
                }
              ]
            }
          }
        ]
      }
    }]
    

    Please find below first solution that is based on flatMap and for loop.

      case class Event(
          productId: String,
          familyId: String,
          productTypeId: String,
          key: String,
          productName: String,
          deviceOS: String,
          featureMap: Map[String, String])
    
      import org.apache.spark.sql.{Dataset, Row, SparkSession}
    
      import scala.collection.mutable
    
      val spark = SparkSession
          .builder
          .appName("StructuredStreaming")
          .master("local[*]")
          .getOrCreate()
    
      private val inputDataFrame = spark.read.option("multiline", "true").format("json").load("/absolute_path_to_kafka.json")
    
    
      import spark.implicits._
    
      val finalDataset: Dataset[Event] = inputDataFrame.flatMap(
        row => {
    
          val userActions = row.getAs[Row]("userActions")
          val productId = userActions.getAs[String]("productId")
    
          val userEvents = userActions.getAs[mutable.WrappedArray[Row]]("events")
          for (event <- userEvents;
               familyId = event.getAs[Int]("familyId").toString;
               productTypeId = event.getAs[Int]("productTypeId").toString;
               serialId = event.getAs[String]("serialID");
               productName = event.getAs[String]("productName");
               key = s"$productId:$serialId";
               features = event.getAs[Row]("features");
               mappings = features.getAs[mutable.WrappedArray[Row]]("mapping");
               mappingRow <- mappings;
               deviceId = mappingRow.getAs[Long]("deviceId");
               osIds = mappingRow.getAs[mutable.WrappedArray[Long]]("osId");
               osId <- osIds;
               deviseOs = deviceId + "_" + osId
               ) yield Event(productId, familyId, productTypeId, key, productName, deviseOs, Map("target" -> (deviseOs)))
        }
    
      )
    
      finalDataset.foreach(e => println(e))
    
    //  Event(3MFETP501,2010,1004678,3MFETP501:890479804,MFE Total Protection 2021 Family Pack,999795_100,Map(target -> 999795_100))
    //  Event(3MFETP501,2010,1004678,3MFETP501:890479804,MFE Total Protection 2021 Family Pack,987875_101,Map(target -> 987875_101))
    

    Also, you can solve this task using select, withColumn, explode, concat functions.

      case class Event(
          productId: String,
          familyId: String,
          productTypeId: String,
          key: String,
          productName: String,
          deviceOS: String,
          featureMap: Map[String, String])
    
      import org.apache.spark.sql.{Dataset, SparkSession}
      import org.apache.spark.sql.functions.{col, explode, concat, lit, map}
    
      val spark = SparkSession
          .builder
          .appName("StructuredStreaming")
          .master("local[*]")
          .getOrCreate()
    
      private val inputDataFrame = spark.read.option("multiline", "true").format("json").load("/absolute_path_to_kafka.json")
    
      val transformedDataFrame = inputDataFrame
          .select(col("userActions.productId").as("productId"),
            explode(col("userActions.events")).as("event"))
          .select(col("productId"),
            col("event.familyId").as("familyId"),
            col("event.productTypeId").as("productTypeId"),
            col("event.serialID").as("serialID"),
            col("event.productName").as("productName"),
            explode(col("event.features.mapping")).as("features")
          )
          .select(
            col("productId"),
            col("familyId"),
            col("productTypeId"),
            col("serialID"),
            col("productName"),
            col("features.deviceId").as("deviceId"),
            explode(col("features.osId")).as("osId")
          )
          .withColumn("key", concat(col("productId"), lit(":"), col("serialID")))
          .withColumn("deviceOS", concat(col("deviceId"), lit("_"), col("osId")))
          .withColumn("featureMap", map(lit("target"), col("deviceOS")))
    
      import spark.implicits._
    
      private val result: Dataset[Event] = transformedDataFrame.as[Event]
      result.foreach(e => println(e))
    
    //  Event(3MFETP501,2010,1004678,3MFETP501:890479804,MFE Total Protection 2021 Family Pack,999795_100,Map(target -> 999795_100))
    //  Event(3MFETP501,2010,1004678,3MFETP501:890479804,MFE Total Protection 2021 Family Pack,987875_101,Map(target -> 987875_101))
    

    Add option to customize response based on the value one of the field. I replace here for comprehension to map/flatmap, so you can return as response one or several events based on the type. Also, I customized json a little bit to show more examples in the result.

    New json:

    [{
      "partition": 1,
      "key": "34768_20220203_MFETP501",
      "offset": 1841543,
      "createTime": 1646041475348,
      "topic": "topic_int",
      "publishTime": 1646041475344,
      "errorCode": 0,
      "userActions": {
        "productId": "3MFETP501",
        "createdDate": "2022-02-26T11:19:35.786Z",
        "events": [
          {
            "GUID": "dbb1-f38b-f7f0-44af-90da-80179412f89c",
            "eventDate": "2022-02-26T11:19:35.786Z",
            "familyId": 2010,
            "productTypeId": 1004678,
            "serialID": "890479804",
            "productName": "MFE Total Protection 2021 Family Pack",
            "features": {
              "mapping": [
                {
                  "deviceId": 999795,
                  "osId": [
                    100,
                    110
                  ]
                },
                {
                  "deviceId": 987875,
                  "osId": [
                    101
                  ]
                }
              ]
            }
          },
          {
            "GUID": "1111-2222-f7f0-44af-90da-80179412f89c",
            "eventDate": "2022-03-26T11:19:35.786Z",
            "familyId": 2011,
            "productTypeId": 1004679,
            "serialID": "890479805",
            "productName": "Product name",
            "features": {
              "mapping": [
                {
                  "deviceId": 999796,
                  "osId": [
                    103
                  ]
                },
                {
                  "deviceId": 987877,
                  "osId": [
                    104
                  ]
                }
              ]
            }
          }
        ]
      }
    }]
    

    Please find code below:

      case class Event(
          productId: String,
          familyId: String,
          productTypeId: String,
          key: String,
          productName: String,
          deviceOS: String,
          featureMap: Map[String, String])
    
      import org.apache.spark.sql.{Dataset, SparkSession}
    
      val spark = SparkSession
          .builder
          .appName("StructuredStreaming")
          .master("local[*]")
          .getOrCreate()
    
      private val inputDataFrame = spark.read.option("multiline", "true").format("json").load("/absolute_path_to_kafka.json")
    
      import spark.implicits._
    
      val finalDataset: Dataset[Event] = inputDataFrame.flatMap(
        row => {
    
          val userActions = row.getAs[Row]("userActions")
          val productId = userActions.getAs[String]("productId")
    
          val userEvents = userActions.getAs[mutable.WrappedArray[Row]]("events")
          for (event <- userEvents;
               productTypeId = event.getAs[Int]("productTypeId").toString;
               serialId = event.getAs[String]("serialID");
               productName = event.getAs[String]("productName");
               key = s"$productId:$serialId";
               familyId = event.getAs[Int]("familyId").toString;
               features = event.getAs[Row]("features");
               mappings = features.getAs[mutable.WrappedArray[Row]]("mapping");
               mappingRow <- mappings;
               deviceId = mappingRow.getAs[Long]("deviceId");
               osIds = mappingRow.getAs[mutable.WrappedArray[Long]]("osId");
               osId <- osIds;
               deviseOs = deviceId + "_" + osId
               ) yield Event(productId, familyId, productTypeId, key, productName, deviseOs, Map("target" -> deviseOs))
    
          userEvents.flatMap(event => {
            val productTypeId = event.getAs[Int]("productTypeId").toString
            val serialId = event.getAs[String]("serialID")
            val productName = event.getAs[String]("productName")
            val key = s"$productId:$serialId"
            val familyId = event.getAs[Long]("familyId")
    
            if(familyId == 2010) {
              val features = event.getAs[Row]("features")
              val mappings = features.getAs[mutable.WrappedArray[Row]]("mapping")
              mappings.flatMap(mappingRow => {
                val deviceId = mappingRow.getAs[Long]("deviceId")
                val osIds = mappingRow.getAs[mutable.WrappedArray[Long]]("osId")
                osIds.map(osId => {
                  val devise_os = deviceId + "_" + osId
                  Event(productId, familyId.toString, productTypeId, key, productName, devise_os, Map("target" -> devise_os))
                })
              })
            } else {
              Seq(Event(productId, familyId.toString, productTypeId, key, productName, "default_defice_os", Map("target" -> "default_defice_os")))
            }
          })
    
        }
      )
    
      finalDataset.foreach(e => println(e))
    
    //  Event(3MFETP501,2010,1004678,3MFETP501:890479804,MFE Total Protection 2021 Family Pack,999795_100,Map(target -> 999795_100))
    //  Event(3MFETP501,2010,1004678,3MFETP501:890479804,MFE Total Protection 2021 Family Pack,999795_110,Map(target -> 999795_110))
    //  Event(3MFETP501,2010,1004678,3MFETP501:890479804,MFE Total Protection 2021 Family Pack,987875_101,Map(target -> 987875_101))
    //  Event(3MFETP501,2011,1004679,3MFETP501:890479805,Product name,default_defice_os,Map(target -> default_defice_os))