Unable to Iterate over the list of keys retrieved from coverting Dstream to List while using spark streaming with kafka

Below is the code for spark streaming with kafka. Here I am trying to get the keys for the batch as Dstream and then covert it to a LIST. In order to iterate over it and put data pertaining to each key in a hdfs folder named after the key.

Key is basically - Schema.Table_name

val ssc = new StreamingContext(sparkConf, Seconds(args{7}.toLong)) // configured to run for every 60 seconds
val warehouseLocation="Spark-warehouse"
val spark = SparkSession.builder.config(sparkConf).getOrCreate() 
import spark.implicits._

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> conf.getString("kafka.brokers"),
  "zookeeper.connect" -> conf.getString("kafka.zookeeper"),
  "" -> conf.getString("kafka.consumergroups"),
  "auto.offset.reset" -> args { 1 },
  "" -> (conf.getString("kafka.autoCommit").toBoolean: java.lang.Boolean),
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "security.protocol" -> "SASL_PLAINTEXT",
  "" -> args { 2 },
  "max.poll.records" -> args { 3 },
  "" -> args { 4 },
  "" -> args { 5 })

val messages = KafkaUtils.createDirectStream[String, String](
  Subscribe[String, String](topicsSet, kafkaParams))

Extracting the keys but it is of type DStream[String]

 val keys =>(x.key()))

var final_list_of_keys = List[String]()

Converting it into a list and updating var final_list_of_keys

keys.foreachRDD( rdd => {

val  df_keys =
val comma_separated_keys= df_keys.distinct().collect().mkString("").replace("[","").replace("]",",")

final_list_of_keys= comma_separated_keys.split(",").toList

Now trying to iterate over the list.

 for ( i <- final_list_of_keys)

val message1 = messages.filter(x =>  x.key().toString().equals(i)).map(x=>x.value()).persist(StorageLevel.MEMORY_ONLY) //.toString())

 message1.foreachRDD((rdd, batchTime) => {

 if (!rdd.isEmpty())

   val df1 =  //.withColumn("pharmacy_location",lit(args{6}))

   val df2=df1.withColumn("message",struct( struct($"*",lit(args{6}).as("pharmacy_location")).alias("data"), struct($"message.headers.*").as("headers"))).persist(StorageLevel.MEMORY_ONLY)

   val df3= df2.drop("headers").drop("messageSchema").drop("messageSchemaId").persist(StorageLevel.MEMORY_ONLY)

   df3.coalesce(1).write.json(conf.getString("hdfs.streamoutpath1")+ PATH_SEPERATOR + i + PATH_SEPERATOR + args{6}+ PATH_SEPERATOR+ date_today.format(System.currentTimeMillis())
        + PATH_SEPERATOR + date_today_hour.format(System.currentTimeMillis()) + PATH_SEPERATOR + System.currentTimeMillis())




messages.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)                            // push it back 
  case e: BlockMissingException => e.printStackTrace()
 case e: IOException => e.printStackTrace()
 case e:Throwable => e.printStackTrace()


But I get the error - Adding new inputs, transformations, and output operations after starting a context is not supported

When I tried to keep the for loop on list outside the keys.foreachRdd then the list does not get updated and remains empty.

Can someone please advice how can I actually redo this code to have the keys in a list then go over them to put data in correct directory.

From my research i saw post -

Similar post but unable to gather any solution from it

Also,as I am using map,filter inside foreachRdd and then another foreachRdd inside it can cause a problem. Refer post - Post with similar code


  • Below is the code for the problem -

    val messages = KafkaUtils.createDirectStream[String, String](
      Subscribe[String, String](topicsSet, kafkaParams)).persist(StorageLevel.MEMORY_ONLY)
     messages.foreachRDD((rdd,batchTime) =>          ///foreachRDD means go over each rdd parallelly , it gives the rdd and we will put the batch time also
      val => x.key()).distinct().collect()  ////kafka sends data in key value pairs,
                                                               ///here rdd means key and values(key is tablename) and first we need to get all the distinct keys(this batch had 5 tables)
     val rddList =>(x,(rdd.filter(y=>y.key().equals(x)))))
     ///here x means table name and we are filtering data in the rdd which is equalent to current_table_name
      ///Now this table_list will contains the key(table) and values corresponding to each key
    rddList.foreach(tuple =>  //here foreach not in parallal, we want to go one by one , touple is nothing but collection of key and multiple
       val tableName= tuple._1.toString()   //tuple._1 will be the table name
      val tableRdd=>(x.value())).persist(StorageLevel.MEMORY_ONLY) // .toDF()
      ///tuple._2  will be the complete key value pair,we are putting the value in the hdfs
    //   val tableRdd= messages.filter(x => x.key().toString().equals(tableName)).map(x=>x.value()).persist(StorageLevel.MEMORY_ONLY)
    /* Your logic */