Search code examples
pythonrestairflowscheduledirected-acyclic-graphs

Scheduling SQL Queries inside Airflow DAG


I have a use case where we extract SQL Queries from a REST Endpoint and schedule them using an external scheduler, Airflow in our case. So I was thinking of solving this by doing the following steps.

  1. Create a DAG in airflow that runs every hour and extracts the data from the REST API
  2. Execute all SQL in parallel concurrency
  3. Send a Slack message to the user once the Query is success.

Now the issue is we also pull the schedule interval for each query as well. For e.g. @daily, @weekly, @hourly etc. along with the start_time and end_time. And based on this we want to filter out the SQL Queries and run only that should at that moment.

Any idea as to how we can do that? Or any other way we can tackle this use case. Thanks in advance.


Solution

  • You can find the get_previous_dagrun of the dag and get it end_date.

    @task
    def get_last_run():
        context = get_current_context()
        dag_run: DagRun = context["dag_run"].get_previous_dagrun(state="success")
        if dag_run:
            print(dag_run.end_date)
            return str(dag_run.end_date)
        return str(context["dag"].get_latest_execution_date())