Search code examples
sqlpandascommitairflowpostgresql-8.2

Airflow + pandas read_sql_query() with commit


Question

Can I commit a SQL transaction to a DB using read_sql()?

Use Case and Background

I have a use case where I want to allow users to execute some predefined SQL and have a pandas dataframe returned. In some cases, this SQL will need to query a pre-populated table, and in other cases, this SQL will execute a function which will write to a table and then that table will be queried. This logic is currently contained inside of method in an Airflow DAG in order to leverage database connection information accessible to Airflow using the PostgresHook - the method is eventually called in a PythonOperator task. It's my understanding through testing that the PostgresHook creates a psycopg2 connection object.

Code

from airflow.hooks.postgres_hook import PostgresHook
import pandas as pd 

def create_df(job_id,other_unrelated_inputs):
    conn = job_type_to_connection(job_type) # method that helps choose a database
    sql = open('/sql_files/job_id_{}.sql'.format(job_id)) #chooses arbitrary SQL  
    sql_template = sql.read() 
    hook = PostgresHook(postgres_conn_id=conn) #connection information for alias is predefined elsewhere within Airflow


    try:
        hook_conn_obj = hook.get_conn()
        print(type(hook_conn_obj)) # <class 'psycopg2.extensions.connection'>
        # Runs SQL template with variables, but does not commit. Alternatively, have used hook.get_pandas_df(sql_template)
        df = pd.io.sql.read_sql(sql_template, con = hook_conn_obj) 
    except:
        #catches some errors#
    return df

Problem

Currently, when executing a SQL function, this code generates a dataframe, but does not commit any of the DB changes made in the SQL function. For example, to be more precise, if the SQL function INSERTs a row into a table, that transaction will not commit and the row will not appear in the table.

Attempts

I've attempted a few fixes but am stuck. My latest effort was to change the autocommit attribute of the psycopg2 connection that read_sql uses in order to autocommit the transaction.

I'll admit that I haven't been able to figure out when the attributes of the connection have an impact on the execution of the SQL.

I recognize that an alternative path is to replicate some of the logic in PostgresHook.run() to commit and then add some code to push results into a dataframe, but it seems more parsimonious and easier for future support to use the methods already created, if possible.

The most analogous SO question I could find was this one, but I'm interested in an Airflow-independent solution.

EDIT

...
    try:
        hook_conn_obj = hook.get_conn()
        print(type(hook_conn_obj)) # <class 'psycopg2.extensions.connection'>
        hook_conn_obj.autocommit = True
        df = pd.io.sql.read_sql(sql_template, con = hook_conn_obj) # Runs SQL template with variables, but does not commit
    except:
        #catches some errors#
    return df

This seems to work. If anyone has any commentary or thoughts on a better way to achieve this, I'm still interested in learning from a discussion.

Thank you!


Solution

  • read_sql won't commit because as that method name implies, the goal is to read data, not write. It's good design choice from pandas. This is important because it prevents accidental writes and allows interesting scenarios like running a procedure, read its effects but nothing is persisted. read_sql's intent is to read, not to write. Expressing intent directly is a gold standard principle.

    A more explicit way to express your intent would be to execute (with commit) explicitly before fetchall. But because pandas offers no simple way to read from a cursor object, you would lose the ease of mind provided by read_sql and have to create the DataFrame yourself.

    So all in all your solution is fine, by setting autocommit=True you're indicating that your database interactions will persist whatever they do so there should be no accidents. It's a bit weird to read, but if you named your sql_template variable something like write_then_read_sql or explain in a docstring, the intent would be clearer.