Search code examples
databricksazure-databricks

How to check condition before starting Databricks workflows jobs?


enter image description here

I already have my pipelines ready in Databricks workflows but based on few condition check on input data I have to decide which path of the pipeline I have to execute. Is there any service/technique available in Databricks workflows using which I can perform pre specified condition check and decide which path to execute?

Note - Running databricks in Azure but requirement is I have to explore possible ways to make this pipeline as much databricks native as possible and cannot use data factory.


Solution

  • Use If/else condition tasks and taskValues to achieve this.

    With the help of a Python notebook, you can set task values that can be shared among the workflow's tasks.

    Create a Python notebook that checks the condition based on your data, then use dbutils to set the task values.

    Example code:

    base_pipeline=True
    
    if base_pipeline:
        dbutils.jobs.taskValues.set(key = 'Start_base_pipeline', value = True)
    else:
        dbutils.jobs.taskValues.set(key = 'Start_base_pipeline', value = False)
    

    This sets the task value named Start_base_pipeline, which can be accessed in other workflow tasks.

    Refer to the following documentation for more information about task values:

    Share information between tasks in a Databricks job

    Next, create an If/else condition task that checks this task value and triggers your pipeline accordingly.

    enter image description here

    and

    enter image description here

    Retrieve the task value using a dynamic expression like this:

    {{tasks.[task_name].values.[value_name]}}

    In this case, it's {{tasks.Check_pipeline.values.Start_base_pipeline}}

    Learn more about dynamic values by clicking Browse dynamic values.

    Next, add tasks for True and False conditions.

    Since you mentioned you have base and incremental pipelines, you need to add tasks of type Run job and select your base and incremental job accordingly in the If/else condition task.

    Below is the full pipeline.

    enter image description here

    Output:

    enter image description here