Search code examples
pythonairflowairflow-xcom

Airflow XCOM got error when query SQL Datetime field


Need some help. I'm trying to query data from oracle database. and also new to airflow as well.
My code looks like this:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.oracle.operators.oracle import OracleOperator

default_args = {
    'owner': 'owner_name',
    'retries': 5,
    'retry_delay': timedelta(minutes=10)
}

with DAG(
    'dag_name',
    default_args=default_args,
    schedule_interval='0 1 * * *',
    start_date=datetime(2023, 4, 10),
) as dag:
    task = OracleOperator(
        task_id='task_name',
        oracle_conn_id='connection_name',
        sql="""
            select
                some_datetime_field,
                some_data_field
            from 
                some_table
        """
    )
    task

But I get error:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 72, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 2305, in xcom_push
    session=session,
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 72, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/xcom.py", line 240, in set
    map_index=map_index,
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/xcom.py", line 627, in serialize_value
    return json.dumps(value, cls=XComEncoder).encode("UTF-8")
  File "/usr/local/lib/python3.7/json/__init__.py", line 238, in dumps
    **kw).encode(obj)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/json.py", line 176, in encode
    return super().encode(o)
  File "/usr/local/lib/python3.7/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/local/lib/python3.7/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/json.py", line 153, in default
    CLASSNAME: o.__module__ + "." + o.__class__.__qualname__,
AttributeError: 'datetime.datetime' object has no attribute '__module__'

After I delete line some_datetime_field. It works fine. So, I'm not sure what's wrong. and how can i pass SQL datetime via XCOM to another task?


Solution

  • Can you try converting the date field value to a string in your query? https://www.oracletutorial.com/oracle-date-functions/oracle-to_char/

    The object that is currently getting returned may not be JSON serialisable which is a requirement for pushing to XCOM.

    Additional reference: https://github.com/apache/airflow/discussions/24881