Search code examples
apache-sparkapache-spark-sqlspark-streamingflume

How to filter and convert Stream data from apache flume to rdd/data freame using spark to write it to a table


Hi i am new to flume/Spark/Spark streaming. I have configured flume and with netcat and successfully streamed the data to Spark.

My requirement is to check for the Error in the streamed data(flume stream) from a log file and get the error line (Word "ERROR" in the line that came in stream) and make it as DF to write it to oracle.

I am facing exception in the below filter and convert to DF code. Kindly help me to fix this issue

import org.apache.spark.streaming.flume.FlumeUtils
import org.slf4j.LoggerFactory
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{StreamingContext,Seconds}
import org.apache.spark.streaming.flume._
import org.apache.spark._
import org.apache.spark.streaming._
import spark.implicits._
val hostName = "10.90.3.78"
val port = 9999.toInt
val sparkStreamingContext = new StreamingContext(sc,Seconds(10))
val stream = FlumeUtils.createPollingStream(sparkStreamingContext,hostName,port)
val mappedlines = stream.map( e => new String(e.event.getBody.array()))
.filter(rec => rec.contains("ERROR"))
.map(line => line.split("ERROR"))
val arr = mappedlines.foreachRDD({status=>val DF = status.toDF()})
println(arr)
stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
sparkStreamingContext.start()
sparkStreamingContext.awaitTermination()

Solution

  • I have resolved it using Foreach and converted RDD to DF. It worked and i have inserted the error lines in to DB successfully.