Search code examples
apache-sparkapache-kafkaspark-streamingrdddstream

Unable to Iterate over the list of keys retrieved from coverting Dstream to List while using spark streaming with kafka


Below is the code for spark streaming with kafka. Here I am trying to get the keys for the batch as Dstream and then covert it to a LIST. In order to iterate over it and put data pertaining to each key in a hdfs folder named after the key.

Key is basically - Schema.Table_name

val ssc = new StreamingContext(sparkConf, Seconds(args{7}.toLong)) // configured to run for every 60 seconds
val warehouseLocation="Spark-warehouse"
val spark = SparkSession.builder.config(sparkConf).getOrCreate() 
import spark.implicits._

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> conf.getString("kafka.brokers"),
  "zookeeper.connect" -> conf.getString("kafka.zookeeper"),
  "group.id" -> conf.getString("kafka.consumergroups"),
  "auto.offset.reset" -> args { 1 },
  "enable.auto.commit" -> (conf.getString("kafka.autoCommit").toBoolean: java.lang.Boolean),
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "security.protocol" -> "SASL_PLAINTEXT",
  "session.timeout.ms" -> args { 2 },
  "max.poll.records" -> args { 3 },
  "request.timeout.ms" -> args { 4 },
  "fetch.max.wait.ms" -> args { 5 })

val messages = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.
  Subscribe[String, String](topicsSet, kafkaParams))

Extracting the keys but it is of type DStream[String]

 val keys = messages.map(x=>(x.key()))

var final_list_of_keys = List[String]()

Converting it into a list and updating var final_list_of_keys

keys.foreachRDD( rdd => {

val  df_keys = spark.read.json(rdd).distinct().toDF().persist(StorageLevel.MEMORY_ONLY)
df_keys.show()
val comma_separated_keys= df_keys.distinct().collect().mkString("").replace("[","").replace("]",",")

final_list_of_keys= comma_separated_keys.split(",").toList

Now trying to iterate over the list.

 for ( i <- final_list_of_keys)
 {
  println(i)

val message1 = messages.filter(x =>  x.key().toString().equals(i)).map(x=>x.value()).persist(StorageLevel.MEMORY_ONLY) //.toString())

 message1.foreachRDD((rdd, batchTime) => {

 if (!rdd.isEmpty())
 {


   val df1 = spark.read.json(rdd).persist(StorageLevel.MEMORY_ONLY)  //.withColumn("pharmacy_location",lit(args{6}))

   val df2=df1.withColumn("message",struct( struct($"message.data.*",lit(args{6}).as("pharmacy_location")).alias("data"), struct($"message.headers.*").as("headers"))).persist(StorageLevel.MEMORY_ONLY)

   val df3= df2.drop("headers").drop("messageSchema").drop("messageSchemaId").persist(StorageLevel.MEMORY_ONLY)

   df3.coalesce(1).write.json(conf.getString("hdfs.streamoutpath1")+ PATH_SEPERATOR + i + PATH_SEPERATOR + args{6}+ PATH_SEPERATOR+ date_today.format(System.currentTimeMillis())
        + PATH_SEPERATOR + date_today_hour.format(System.currentTimeMillis()) + PATH_SEPERATOR + System.currentTimeMillis())

   df1.unpersist
   df2.unpersist()
   df3.unpersist()

 }



})

try
{
messages.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)                            // push it back 
}
}
catch
{
  case e: BlockMissingException => e.printStackTrace()
 case e: IOException => e.printStackTrace()
 case e:Throwable => e.printStackTrace()
}

}
 ssc.start()
 ssc.awaitTermination()

But I get the error - Adding new inputs, transformations, and output operations after starting a context is not supported

When I tried to keep the for loop on list outside the keys.foreachRdd then the list does not get updated and remains empty.

Can someone please advice how can I actually redo this code to have the keys in a list then go over them to put data in correct directory.

From my research i saw post -

Similar post but unable to gather any solution from it

Also,as I am using map,filter inside foreachRdd and then another foreachRdd inside it can cause a problem. Refer post - Post with similar code


Solution

  • Below is the code for the problem -

    val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.
      Subscribe[String, String](topicsSet, kafkaParams)).persist(StorageLevel.MEMORY_ONLY)
    
     messages.foreachRDD((rdd,batchTime) =>          ///foreachRDD means go over each rdd parallelly , it gives the rdd and we will put the batch time also
    { 
      val table_list=rdd.map(x => x.key()).distinct().collect()  ////kafka sends data in key value pairs,
                                                               ///here rdd means key and values(key is tablename) and first we need to get all the distinct keys(this batch had 5 tables)
    
     val rddList = table_list.map(x=>(x,(rdd.filter(y=>y.key().equals(x)))))
     ///here x means table name and we are filtering data in the rdd which is equalent to current_table_name
      ///Now this table_list will contains the key(table) and values corresponding to each key
    rddList.foreach(tuple =>  //here foreach not in parallal, we want to go one by one , touple is nothing but collection of key and multiple
       {
    
       val tableName= tuple._1.toString()   //tuple._1 will be the table name
      val tableRdd= tuple._2.map(x=>(x.value())).persist(StorageLevel.MEMORY_ONLY) // .toDF()
    
    
      ///tuple._2  will be the complete key value pair,we are putting the value in the hdfs
    
    
    //   val tableRdd= messages.filter(x => x.key().toString().equals(tableName)).map(x=>x.value()).persist(StorageLevel.MEMORY_ONLY)
       println(tableName)
    
    /* Your logic */