I have input dataframe. I want to get the values of path key where nullable column is null into a variable
val schema_json="""[{"orders":{"order_id":{"path":"orderid","type":"string"},"customer_id":{"path":"customers.customerId","type":"int","default_value":"null"},"offer_id":{"path":"Offers.Offerid","type":"string"},"eligible":{"path":"eligible.eligiblestatus","type":"string","nullable":true,"default_value":"not eligible"}},"products":{"product_id":{"path":"product_id","type":"string"},"product_name":{"path":"products.productname","type":"string"}}}]""";
val schemaRdd = spark.sparkContext.parallelize(schema_json :: Nil);
val schemaRdddf = spark.read.json(schemaRdd);
val schemaColumns = Seq("orders");
schemaRdddf.selectExpr(schemaColumns: _*).show(false)
expected output
abc:List[String]= List("orderid","customers.customerId","Offers.Offerid")
Check below code.
val schema =
"map<string, struct<path:string,type:string,nullable:boolean,default_value:string>>"
val schema_json = Seq(
"""[{"orders":{"order_id":{"path":"orderid","type":"string"},"customer_id":{"path":"customers.customerId","type":"int","default_value":"null"},"offer_id":{"path":"Offers.Offerid","type":"string"},"eligible":{"path":"eligible.eligiblestatus","type":"string","nullable":true,"default_value":"not eligible"}},"products":{"product_id":{"path":"product_id","type":"string"},"product_name":{"path":"products.productname","type":"string"}}}]"""
)
val columName = "Orders".toLowerCase
spark.read
.json(input.toDS)
.selectExpr(
s"""
concat_ws(
',',
transform(
map_entries(
map_filter(
from_json(to_json(${columName}), '${schema}'),
(k,v) -> v.nullable is null
)
),
e -> e.value.path
)
) AS output
"""
)
.show(false)
+-------------------------------------------+
|output |
+-------------------------------------------+
|customers.customerId,Offers.Offerid,orderid|
+-------------------------------------------+