Search code examples
pythonpysparkapache-spark-sqlpyspark-pandas

I want to add a new column to a dataframe with values that I get from a for loop


I've written the below code:

def func_sql(table,query): 
    q=spark.sql(eval(f"f'{query}'")).collect()[0][0]
    print(q)

lst=[]
for i in range(int_df.count()):
    lst.append(int_df.collect()[i][3])
# print(lst)

for x in lst:
    func_sql('table','{}'.format(x)) 

It's giving the following output:

enter image description here

I want to add this output as under a new column in a dataframe, but I'm not able to find a way to do that. I tried using UDF but that requires modifying an already existing column.

Any idea how I can do this?


Solution

  • Based on your comments, here's what I got

    • lst is the list of queries where each element is a query
    • func_sql() function runs the query and prints an output
    • you want the output in the query dataframe -- I'm guessing int_df is the query dataframe

    We can recreate the whole query dataframe using the int_df rows. Because you're collecting the dataframe for the queries anyway, we'll collect it and use all columns from the collection.

    But first, we'll update the func_sql() to return the result of the query instead of printing it.

    def func_sql(table, query): 
        q = spark.sql(eval(f"f'{query}'")).collect()[0][0]
        return q
    

    The part where you create lst will not be required anymore and we can directly iterate over the collection output from collect(). Now, int_df (your dataframe having queries) has 3 columns in this order - 'Column', 'Rule' and 'Query'. We'll use this info in iteration.

    res_list = []
    
    for column, rule, query in int_df.collect():
        res_list.append((column, rule, query, func_sql('table','{}'.format(query))))
    

    res_list will now have all column values from int_df with an additional value from the query result. This variable can now be used to create a spark dataframe.

    res_sdf = spark.sparkContext.parallelize(res_list).toDF(['column', 'rule', 'query', 'result'])
    

    To show an example of how it works, I created a dummy dataframe and will print the query instead of using func_sql().

    data_ls = [
        ('col1', 'rule1', 'query1'),
        ('col2', 'rule2', 'query2')
    ]
    
    data_sdf = spark.sparkContext.parallelize(data_ls).toDF(['column', 'rule', 'query'])
    
    # +------+-----+------+
    # |column| rule| query|
    # +------+-----+------+
    # |  col1|rule1|query1|
    # |  col2|rule2|query2|
    # +------+-----+------+
    
    res_list = []
    
    for column, rule, query in data_sdf.collect():
        res_list.append((column, rule, query, '{0} result here'.format(query)))
    
    print(res_list)
    # [('col1', 'rule1', 'query1', 'query1 result here'),
    #  ('col2', 'rule2', 'query2', 'query2 result here')]
    
    res_sdf = spark.sparkContext.parallelize(res_list).toDF(['column', 'rule', 'query', 'result'])
    
    # +------+-----+------+------------------+
    # |column| rule| query|            result|
    # +------+-----+------+------------------+
    # |  col1|rule1|query1|query1 result here|
    # |  col2|rule2|query2|query2 result here|
    # +------+-----+------+------------------+