i got 1 message in kafka consisting of several independent json lines. i want to stream this message into hdfs. problem is, my code only saves the very first json and ignores the rest.
example 1 kafka message (not multiple messages):
{"field": "1"}
{"field": "2"}
{"field": "3"}
part of the scala code:
val stream = KafkaSource.kafkaStream[String, String, StringDecoder, StringDecoder](
streamingContext, brokers, new ZooKeeperOffsetsStore(zkQuorum, zkPath), topic)
stream.foreachRDD(rdd => {
if (!rdd.isEmpty) {
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
val df = spark.sqlContext.read.format(rdd.map(m => m._2))
df.write.mode(SaveMode.Append).format("json").save(outputPath)
}
})
the particular solution lies in the rdd.map(m => m._2)
part where i need to map all lines, not just the first one. it seems to me that the rdd
itself is already cut and does not contain the rest json lines.
i solved it by working with text instead of json. the main diff lies in the toDF()
transformation:
stream.foreachRDD(rdd => {
if (!rdd.isEmpty) {
//works as .txt file:
rdd.map(m => m._2).toDF().coalesce(1).write.mode(SaveMode.Append).format("text").save(outputPath)
}
})