Search code examples
kubernetesamazon-s3airflowkubernetes-helm

Airflow Sync Dags from S3 Causes Dag Execution Error


I'm running airflow on kubernetes and I'm trying to implement a dag s3 sync instead of git-sync in my airflow.

My airflow deployment is using the helm chart apache-airflow https://airflow.apache.org/ --version 1.5.0.

My values.yaml is set up like bellow:

airflow:
  env:
  - name: AIRFLOW__CORE__ENABLE_XCOM_PICKLING
    value: "True"
config:
  core:
    dags_folder: /opt/airflow/dags/repo/
    executor: KubernetesExecutor
dags:
  persistence:
    enabled: false
executor: KubernetesExecutor

scheduler:
  extraVolumeMounts:
    - name: dags-volume
      mountPath: /opt/airflow/dags/repo
  extraVolumes:
    - name: dags-volume
      emptyDir: {}
  extraContainers:
    - name: s3-sync
      image: amazon/aws-cli
      command:
        - /bin/sh
        - -c
        - >
          while true; do
            aws s3 sync s3://bucket/airflow/ /opt/airflow/dags/repo/ --exclude "*" --include "dags/*" 
            sleep 600;
          done
      env:
        - name: AWS_ACCESS_KEY_ID
          value: "key"
        - name: AWS_SECRET_ACCESS_KEY
          value: "secret"
        - name: AWS_DEFAULT_REGION
          value: "region"
      volumeMounts:
      - name: dags-volume
        mountPath: /opt/airflow/dags/repo

triggerer:
  extraVolumeMounts:
    - name: dags-volume
      mountPath: /opt/airflow/dags/repo
  extraVolumes:
    - name: dags-volume
      emptyDir: {}
  extraContainers:
    - name: s3-sync
      image: amazon/aws-cli
      command:
        - /bin/sh
        - -c
        - >
          while true; do
            aws s3 sync s3://bucket/airflow/ /opt/airflow/dags/repo/ --exclude "*" --include "dags/*" 
            sleep 600;
          done
      env:
        - name: AWS_ACCESS_KEY_ID
          value: "key"
        - name: AWS_SECRET_ACCESS_KEY
          value: "secret"
        - name: AWS_DEFAULT_REGION
          value: "region"
      volumeMounts:
      - name: dags-volume
        mountPath: /opt/airflow/dags/repo

I'm doing a sync from the s3 bucket to a folder inside the POD. When I access the pod the dag file is inside the folder as expected: /opt/airflow/dags/repo/dags/dag_manual.py

But when I try to run any dag, I'm getting the following error:

[2023-03-24 23:01:40,968] {dagbag.py:507} INFO - Filling up the DagBag from /opt/airflow/dags/repo/dags/dag_manual.py
Traceback (most recent call last):
File "/home/airflow/.local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/__main__.py", line 38, in main
args.func(args)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 51, in command
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line 99, in wrapper
return f(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 360, in task_run
dag = get_dag(args.subdir, args.dag_id)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line 204, in get_dag
f"Dag {dag_id!r} could not be found; either it does not exist or it failed to parse."
airflow.exceptions.AirflowException: Dag 'DAG_MANUAL' could not be found; either it does not exist or it failed to parse.

I haven't figured it out the problem, if someone could help.


Solution

  • I may be a bit late but since no one answered I'll try to answer. I tried to setup the similar structure of S3 sync that you were trying to achieve. I used the snippets from your values file and this is the final values file which is working with Airflow helm chart.

    defaultAirflowTag: "2.5.1"
    airflowVersion: "2.5.1"
    fernetKey: "keys"
    webserverSecretKey: "secretkey"
    config:
      core:
        dags_folder: /opt/airflow/dags/
    env:
      - name: "AIRFLOW__CORE__LOAD_EXAMPLES"
        value: "False"
      - name: "AIRFLOW__KUBERNETES_EXECUTOR__NAMESPACE"
        value: "airflow"
      - name: "AIRFLOW__KUBERNETES_EXECUTOR__WORKER_CONTAINER_REPOSITORY"
        value: "account_id.dkr.ecr.eu-west-2.amazonaws.com/airflow-dags"
      - name: "AIRFLOW__KUBERNETES_EXECUTOR__WORKER_CONTAINER_TAG"
        value: "15"
      - name: "AIRFLOW__KUBERNETES__DAGS_IN_IMAGE"
        value: "True"
      - name: "AIRFLOW__KUBERNETES__RUN_AS_USER"
        value: "50000"
    executor: "KubernetesExecutor"
    images:
      airflow:
        repository: account_id.dkr.ecr.eu-west-2.amazonaws.com/airflow-dags
        tag: "15"
    createUserJob:
      useHelmHooks: false
    migrateDatabaseJob:
      useHelmHooks: false
    
    
    webserver:
      replicas: 3
      extraVolumeMounts:
        - name: dags-volume
          mountPath: /opt/airflow/dags/
      extraVolumes:
        - name: dags-volume
          emptyDir: {}
      extraContainers:
        - name: s3-sync
          image: public.ecr.aws/aws-cli/aws-cli
          command:
            - /bin/sh
            - -c
            - >
              while true; do
                aws s3 sync s3://airflow-dags/ /opt/airflow/dags/
                sleep 60;
              done
          env:
            - name: AWS_ACCESS_KEY_ID
              value: "random_key"
            - name: AWS_SECRET_ACCESS_KEY
              value: "random_key"
            - name: AWS_DEFAULT_REGION
              value: "region"
          volumeMounts:
          - name: dags-volume
            mountPath: /opt/airflow/dags/
    
    
    scheduler:
      extraVolumeMounts:
        - name: dags-volume
          mountPath: /opt/airflow/dags/
      extraVolumes:
        - name: dags-volume
          emptyDir: {}
      extraContainers:
        - name: s3-sync
          image: public.ecr.aws/aws-cli/aws-cli
          command:
            - /bin/sh
            - -c
            - >
              while true; do
                aws s3 sync s3://airflow-dags/ /opt/airflow/dags/
                sleep 60;
              done
          env:
            - name: AWS_ACCESS_KEY_ID
              value: "random_key"
            - name: AWS_SECRET_ACCESS_KEY
              value: "random_key"
            - name: AWS_DEFAULT_REGION
              value: "region"
          volumeMounts:
          - name: dags-volume
            mountPath: /opt/airflow/dags/
    
    triggerer:
      extraVolumeMounts:
        - name: dags-volume
          mountPath: /opt/airflow/dags/
      extraVolumes:
        - name: dags-volume
          emptyDir: {}
      extraContainers:
        - name: s3-sync
          image: public.ecr.aws/aws-cli/aws-cli
          command:
            - /bin/sh
            - -c
            - >
              while true; do
                aws s3 sync s3://airflow-dags/ /opt/airflow/dags/
                sleep 60;
              done
          env:
            - name: AWS_ACCESS_KEY_ID
              value: "random_key"
            - name: AWS_SECRET_ACCESS_KEY
              value: "random_key"
            - name: AWS_DEFAULT_REGION
              value: "region"
          volumeMounts:
          - name: dags-volume
            mountPath: /opt/airflow/dags/
    
    
    workers:
      extraVolumeMounts:
        - name: dags-volume
          mountPath: /opt/airflow/dags/
      extraVolumes:
        - name: dags-volume
          emptyDir: {}
      extraInitContainers:
        - name: s3-sync
          image: public.ecr.aws/aws-cli/aws-cli
          command:
            - /bin/sh
            - -c
            - aws s3 sync s3://airflow-dags/ /opt/airflow/dags/
          env:
            - name: AWS_ACCESS_KEY_ID
              value: "random_key"
            - name: AWS_SECRET_ACCESS_KEY
              value: "random_key"
            - name: AWS_DEFAULT_REGION
              value: "region"
          volumeMounts:
          - name: dags-volume
            mountPath: /opt/airflow/dags/
    
    
    rbac:
      create: true
      events: true
      secrets: true
    
    ingress:
      web:
        enabled: true
        annotations:
          alb.ingress.kubernetes.io/certificate-arn: arn:aws:acm:eu-west-2:account_id:certificate/wdbhwdekaqs
          alb.ingress.kubernetes.io/group.name: airflow
          alb.ingress.kubernetes.io/listen-ports: '[{"HTTPS":443}, {"HTTP":80}]'
          alb.ingress.kubernetes.io/load-balancer-attributes: idle_timeout.timeout_seconds=4000
          alb.ingress.kubernetes.io/scheme: internal
          alb.ingress.kubernetes.io/ssl-redirect: '443'
          alb.ingress.kubernetes.io/target-type: ip
          kubernetes.io/ingress.class: alb
        path: "/"
        pathType: "Prefix"
        hosts:
        - name: "airflow.dev.example.net"
          tls:
        #     # Enable TLS termination for the web Ingress
            enabled: true
    

    This values file is working with the following helm chart.

    repoURL: "https://airflow.apache.org"
    targetRevision: "1.15.0"
    chart: airflow
       
    

    The reason because of which you might have faced the issue might be because you weren't syncing the DAGs from the S3 bucket to the workers. Which I did using the init containers. You can use a side car as well if needed but you would be responsible to terminate it when the main container is done executing.

    One improvement that can be made here is to make use of IAM roles instead of keys.

    Note: I'm using a custom airflow image but that is created from the official airflow image and not much changes have been done to it except for installing a couple of python packages. The values file works with an official image as well.

    Sample DAG that I ran:

    from airflow.decorators import dag, task
    from airflow.operators.bash import BashOperator
    
    from datetime import datetime
    
    @dag(start_date=datetime(2023, 1 , 1), schedule='@daily', catchup=False)
    def parallel_dag():
    
        tasks = [BashOperator(task_id='task_{0}'.format(t), bash_command='sleep 60'.format(t)) for t in range(1, 4)]
    
        @task
        def task_4(data):
            print(data)
            return 'done'
        
        @task
        def task_5(data):
            print(data)
    
        tasks >> task_5(task_4(42))
    
    parallel_dag()
    

    Please do let me know if you face any issues with it.