I have the following Airflow DAG which uploads a single local file into an S3 bucket.
# airflow related
from airflow import DAG
from airflow.operators.python import PythonOperator
# other packages
from datetime import datetime
import boto3
with DAG(
dag_id='upload_to_s3',
start_date=datetime(2020, 5, 5),
schedule_interval='@once',
catchup=False,
) as dag:
pass
def file_upload():
#Creating Session With Boto3.
session = boto3.Session(
aws_access_key_id='my_access_key_id',
aws_secret_access_key='my_aws_secret_access_key'
)
#Creating S3 Resource From the Session.s
s3 = session.resource('s3')
result = s3.Bucket('flight-data-test-bucket').upload_file('/opt/airflow/dags/pricedata.xlsx', 'pricedata.xlsx')
return result
with DAG(
dag_id='upload_to_s3',
start_date=datetime(2020, 5, 5),
schedule_interval='@once',
catchup=False,
) as dag:
# Upload the file
task_file_to_s3 = PythonOperator(
task_id='upload_to_s3',
python_callable=file_upload
)
The DAG is imported in Airflow without any errors however when I try to force run it doesn't do anything as can be also seen in the below screenshot:
When I check the Task Instance Details it says that "Dependencies Blocking Task From Getting Scheduled. DependencyReasonTask Instance StateTask is in the 'queued' state."
I assume that this might be related to something going wrong with the start_date or schedule_interval but I'm not sure. Any ideas? I have been running Airflow on Windows through Docker.
It turned out that there was no problem with my DAG but something was wrong with my docker.
After removing everything(containers, images, etc.) I deleted all my airflow data and run a clean install by following this tutorial to install airflow. Now my DAG is working.