Search code examples
pythongoogle-bigqueryjinja2airflowdirected-acyclic-graphs

airflow jinja2.exceptions.UndefinedError: 'airflow.models.taskinstance.TaskInstance object' has no attribute 'map_index'


First, I'm new to Airflow and Python. In the past I have installed airflow 2.3.3 (and some providers) on wsl2. My dag worked as expected without any error My dag:

import os
from datetime import datetime
import logging
from airflow import models
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.bigquery import (
    BigQueryCreateEmptyDatasetOperator,
    BigQueryCreateEmptyTableOperator,
    BigQueryDeleteDatasetOperator,
    BigQueryDeleteTableOperator,
    BigQueryGetDatasetTablesOperator,
    BigQueryUpdateDatasetOperator,
    BigQueryUpdateTableOperator,
    BigQueryUpdateTableSchemaOperator,
    BigQueryUpsertTableOperator,
    BigQueryInsertJobOperator,
)
from airflow.utils.trigger_rule import TriggerRule

DAG_ID = "bigquery_dataset"
DATASET_NAME = f"dataset_{DAG_ID}"

with models.DAG(
    DAG_ID,
    schedule_interval="@once",
    start_date=datetime(2022, 8, 16),
    catchup=False,
    tags=["example", "bigquery"],
) as dag:

    create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET_NAME, exists_ok=True)
    
    create_dataset

Now I have to downgrade airflow from 2.3.3 to 2.2.5

Steps:

  • Fully uninstall airflow
  • Install airflow with specific version
  • Install providers that compatible with airflow enter image description here

Now I recreate and trigger my dag but I got error:

 scheduler | [2022-08-19 15:31:05,927] {sequential_executor.py:59} INFO - Executing command: ['airflow', 'tasks', 'run', 'bigquery_dataset', 'create_view', 'manual__2022-08-19T08:30:58.710367+00:00', '--local', '--subdir', 'DAGS_FOLDER/test_bigquery.py']
 scheduler | [2022-08-19 15:31:07,217] {dagbag.py:500} INFO - Filling up the DagBag from /home/bao/airflow/dags/test_bigquery.py
 scheduler | Running <TaskInstance: bigquery_dataset.create_view manual__2022-08-19T08:30:58.710367+00:00 [queued]> on host DESKTOP-H8O5RAP.localdomain
 scheduler | Traceback (most recent call last):
 scheduler | File "/home/bao/.local/bin/airflow", line 8, in <module>
 scheduler | sys.exit(main())
 scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/__main__.py", line 48, in main
 scheduler | args.func(args)
 scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
 scheduler | return func(*args, **kwargs)
 scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 92, in wrapper
 scheduler | return f(*args, **kwargs)
 scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 298, in task_run
 scheduler | _run_task_by_selected_method(args, dag, ti)
 scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 105, in _run_task_by_selected_method
 scheduler | _run_task_by_local_task_job(args, ti)
 scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 163, in _run_task_by_local_task_job
 scheduler | run_job.run()
 scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 246, in run
 scheduler | self._execute()
 scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/jobs/local_task_job.py", line 78, in _execute
 scheduler | self.task_runner = get_task_runner(self)
 scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/task/task_runner/__init__.py", line 63, in get_task_runner
 scheduler | task_runner = task_runner_class(local_task_job)
 scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/task/task_runner/standard_task_runner.py", line 35, in __init__
 scheduler | super().__init__(local_task_job)
 scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/task/task_runner/base_task_runner.py", line 48, in __init__
 scheduler | super().__init__(local_task_job.task_instance)
 scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/utils/log/logging_mixin.py", line 40, in __init__
 scheduler | self._set_context(context)
 scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/utils/log/logging_mixin.py", line 54, in _set_context
 scheduler | set_context(self.log, context)
 scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/utils/log/logging_mixin.py", line 178, in set_context
 scheduler | handler.set_context(value)
 scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/utils/log/file_task_handler.py", line 60, in set_context
 scheduler | local_loc = self._init_file(ti)
 scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/utils/log/file_task_handler.py", line 283, in _init_file
 scheduler | relative_path = self._render_filename(ti, ti.try_number)
 scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/utils/log/file_task_handler.py", line 85, in _render_filename
 scheduler | return render_template_to_string(self.filename_jinja_template, context)
 scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/utils/helpers.py", line 268, in render_template_to_string
 scheduler | return render_template(template, context, native=False)
 scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/utils/helpers.py", line 263, in render_template
 scheduler | return "".join(nodes)
 scheduler | File "<template>", line 20, in root
 scheduler | File "/home/bao/.local/lib/python3.8/site-packages/jinja2/runtime.py", line 903, in _fail_with_undefined_error
 scheduler | raise self._undefined_exception(self._undefined_message)
 scheduler | jinja2.exceptions.UndefinedError: 'airflow.models.taskinstance.TaskInstance object' has no attribute 'map_index'
 scheduler | [2022-08-19 15:31:08,234] {sequential_executor.py:66} ERROR - Failed to execute task Command '['airflow', 'tasks', 'run', 'bigquery_dataset', 'create_view', 'manual__2022-08-19T08:30:58.710367+00:00', '--local', '--subdir', 'DAGS_FOLDER/test_bigquery.py']' returned non-zero exit status 1..

The log said that jinja2 tried to render taskinstance but it failed since there is no attribute 'map_index', right? Is there version conflict or something wrong with my code (or my environmnent)? Thank you


Solution

  • Looks like your database schema is outdated. Or, actually, database is correct, but airflow is somewhy wrong.

    Try to check airflow version and either migrate your database or reinstall airflow:

    airflow version
    
    airflow db update  # If airflow version is correct
    pip3 install airflow==2.2.5  # If not
    

    Idk, can this command downgrade the DB, but if you were moving from 2.2.5 to 2.3.4 it would definitely help.

    More about it on https://airflow.apache.org/docs/apache-airflow/stable/installation/upgrading.html, https://airflow.apache.org/docs/apache-airflow/stable/migrations-ref.html and https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#db