I've written the below code:
def func_sql(table,query):
q=spark.sql(eval(f"f'{query}'")).collect()[0][0]
print(q)
lst=[]
for i in range(int_df.count()):
lst.append(int_df.collect()[i][3])
# print(lst)
for x in lst:
func_sql('table','{}'.format(x))
It's giving the following output:
I want to add this output as under a new column in a dataframe, but I'm not able to find a way to do that. I tried using UDF but that requires modifying an already existing column.
Any idea how I can do this?
Based on your comments, here's what I got
lst
is the list of queries where each element is a queryfunc_sql()
function runs the query and prints an outputint_df
is the query dataframeWe can recreate the whole query dataframe using the int_df
rows. Because you're collecting the dataframe for the queries anyway, we'll collect it and use all columns from the collection.
But first, we'll update the func_sql()
to return the result of the query instead of printing it.
def func_sql(table, query):
q = spark.sql(eval(f"f'{query}'")).collect()[0][0]
return q
The part where you create lst
will not be required anymore and we can directly iterate over the collection output from collect()
. Now, int_df
(your dataframe having queries) has 3 columns in this order - 'Column', 'Rule' and 'Query'. We'll use this info in iteration.
res_list = []
for column, rule, query in int_df.collect():
res_list.append((column, rule, query, func_sql('table','{}'.format(query))))
res_list
will now have all column values from int_df
with an additional value from the query result. This variable can now be used to create a spark dataframe.
res_sdf = spark.sparkContext.parallelize(res_list).toDF(['column', 'rule', 'query', 'result'])
To show an example of how it works, I created a dummy dataframe and will print the query instead of using func_sql()
.
data_ls = [
('col1', 'rule1', 'query1'),
('col2', 'rule2', 'query2')
]
data_sdf = spark.sparkContext.parallelize(data_ls).toDF(['column', 'rule', 'query'])
# +------+-----+------+
# |column| rule| query|
# +------+-----+------+
# | col1|rule1|query1|
# | col2|rule2|query2|
# +------+-----+------+
res_list = []
for column, rule, query in data_sdf.collect():
res_list.append((column, rule, query, '{0} result here'.format(query)))
print(res_list)
# [('col1', 'rule1', 'query1', 'query1 result here'),
# ('col2', 'rule2', 'query2', 'query2 result here')]
res_sdf = spark.sparkContext.parallelize(res_list).toDF(['column', 'rule', 'query', 'result'])
# +------+-----+------+------------------+
# |column| rule| query| result|
# +------+-----+------+------------------+
# | col1|rule1|query1|query1 result here|
# | col2|rule2|query2|query2 result here|
# +------+-----+------+------------------+