Search code examples
jsonscalaapache-sparkapache-spark-sqlrdd

how to store JSONLines RDD message from kafka


i got 1 message in kafka consisting of several independent json lines. i want to stream this message into hdfs. problem is, my code only saves the very first json and ignores the rest.

example 1 kafka message (not multiple messages):

{"field": "1"}
{"field": "2"}
{"field": "3"}

part of the scala code:

 val stream = KafkaSource.kafkaStream[String, String, StringDecoder, StringDecoder](
      streamingContext, brokers, new ZooKeeperOffsetsStore(zkQuorum, zkPath), topic)
    stream.foreachRDD(rdd => {
      if (!rdd.isEmpty) {

        val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()

        val df = spark.sqlContext.read.format(rdd.map(m => m._2))

        df.write.mode(SaveMode.Append).format("json").save(outputPath)
      }

    })

the particular solution lies in the rdd.map(m => m._2) part where i need to map all lines, not just the first one. it seems to me that the rdd itself is already cut and does not contain the rest json lines.


Solution

  • i solved it by working with text instead of json. the main diff lies in the toDF() transformation:

    stream.foreachRDD(rdd => {
    
          if (!rdd.isEmpty) {        
            //works as .txt file: 
            rdd.map(m => m._2).toDF().coalesce(1).write.mode(SaveMode.Append).format("text").save(outputPath)
    
    
          }
        })