I am trying to create a XmlRelation based on ConsumerRecord Value.
val value = record.value();
logger.info(".processRecord() : Value ={}" , value)
if(value !=null) {
val rdd = spark.sparkContext.parallelize(List(new String(value)))
How ever when i try to create an RDD based on the value i am getting NullPointerException.
org.apache.spark.SparkException: Job aborted due to stage failure:
Is this because i cannot create an RDD as i cannot get sparkContext on on worker nodes. Obviously i cannot send this information to back to the Driver as this is an infinite Stream.
What alternatives do i have.
The other alternative is write this record data along with Header info to another topic and write it back to another topic and have another streaming job process that info.
The ConsumerRecord Value i am getting is String (XML) and i want to parse it using an existing schema into an RDD and process it further.
Thanks Sateesh
I am able to use the following code and make it work
val xmlStringDF:DataFrame = batchDF.selectExpr("value").filter($"value".isNotNull)
logger.info(".convert() : xmlStringDF Schema ={}",xmlStringDF.schema.treeString)
val rdd: RDD[String] = xmlStringDF.as[String].rdd
logger.info(".convert() : Before converting String DataFrame into XML DataFrame")
val relation = XmlRelation(
() => rdd,
None,
parameters.toMap,
xmlSchema)(spark.sqlContext)
val xmlDF = spark.baseRelationToDataFrame(relation)