Question
Can I commit a SQL transaction to a DB using read_sql()?
Use Case and Background
I have a use case where I want to allow users to execute some predefined SQL and have a pandas dataframe returned. In some cases, this SQL will need to query a pre-populated table, and in other cases, this SQL will execute a function which will write to a table and then that table will be queried. This logic is currently contained inside of method in an Airflow DAG in order to leverage database connection information accessible to Airflow using the PostgresHook - the method is eventually called in a PythonOperator task. It's my understanding through testing that the PostgresHook creates a psycopg2 connection object.
Code
from airflow.hooks.postgres_hook import PostgresHook
import pandas as pd
def create_df(job_id,other_unrelated_inputs):
conn = job_type_to_connection(job_type) # method that helps choose a database
sql = open('/sql_files/job_id_{}.sql'.format(job_id)) #chooses arbitrary SQL
sql_template = sql.read()
hook = PostgresHook(postgres_conn_id=conn) #connection information for alias is predefined elsewhere within Airflow
try:
hook_conn_obj = hook.get_conn()
print(type(hook_conn_obj)) # <class 'psycopg2.extensions.connection'>
# Runs SQL template with variables, but does not commit. Alternatively, have used hook.get_pandas_df(sql_template)
df = pd.io.sql.read_sql(sql_template, con = hook_conn_obj)
except:
#catches some errors#
return df
Problem
Currently, when executing a SQL function, this code generates a dataframe, but does not commit any of the DB changes made in the SQL function. For example, to be more precise, if the SQL function INSERTs a row into a table, that transaction will not commit and the row will not appear in the table.
Attempts
I've attempted a few fixes but am stuck. My latest effort was to change the autocommit attribute of the psycopg2 connection that read_sql uses in order to autocommit the transaction.
I'll admit that I haven't been able to figure out when the attributes of the connection have an impact on the execution of the SQL.
I recognize that an alternative path is to replicate some of the logic in PostgresHook.run() to commit and then add some code to push results into a dataframe, but it seems more parsimonious and easier for future support to use the methods already created, if possible.
The most analogous SO question I could find was this one, but I'm interested in an Airflow-independent solution.
EDIT
...
try:
hook_conn_obj = hook.get_conn()
print(type(hook_conn_obj)) # <class 'psycopg2.extensions.connection'>
hook_conn_obj.autocommit = True
df = pd.io.sql.read_sql(sql_template, con = hook_conn_obj) # Runs SQL template with variables, but does not commit
except:
#catches some errors#
return df
This seems to work. If anyone has any commentary or thoughts on a better way to achieve this, I'm still interested in learning from a discussion.
Thank you!
read_sql
won't commit because as that method name implies, the goal is to read data, not write. It's good design choice from pandas
. This is important because it prevents accidental writes and allows interesting scenarios like running a procedure, read its effects but nothing is persisted. read_sql
's intent is to read, not to write. Expressing intent directly is a gold standard principle.
A more explicit way to express your intent would be to execute
(with commit) explicitly before fetchall
. But because pandas
offers no simple way to read from a cursor
object, you would lose the ease of mind provided by read_sql
and have to create the DataFrame yourself.
So all in all your solution is fine, by setting autocommit=True
you're indicating that your database interactions will persist whatever they do so there should be no accidents. It's a bit weird to read, but if you named your sql_template
variable something like write_then_read_sql
or explain in a docstring, the intent would be clearer.