Search code examples
apache-sparkdatabricksscheduled-tasksazure-databricksdatabricks-workflows

How to pass parameters to a "Job as Task" from code?


I would like to use the new "Job as Task" feature (as mentioned in this SO answer) but I'm having trouble to pass values into that job.

Scenario

  • I have a workflow job which contains 2 tasks.
  • The workflow is scheduled to run every x minutes.
  • Task_A (type "Notebook"): Read data from a table and based on the contents decide, whether the workflow in Task_B should be executed (or not).
  • Task_B (type "Run Job"): This references another Workflow that itself consists of multiple tasks, all of type "Notebook". The workflow takes several parameters. For the sake of brevity here, let's just assume a parameter entity_ids. This workflow runs totally fine on its own if started manually.

For this scenario I would like to add some logic that decides how much of the workflow will be executed:

If Task_A finds specific information in its table, it should start the workflow in Task_B and provide it with a couple of parameters based on that information (in this example: a list of entity_ids). If that information is not found, the workflow should end gracefully and wait for the next interval.

My question: How do I pass (multiple) values into the job that is referenced in Task_B?

I had tried to set this with dbutils.jobs.taskValues.set("entity_id", "[1, 2]") in Task_A and read with dbutils.jobs.taskValues.get("Task_A", "entity_ids", debugValue="[]" ) in the first Notebook of the workflow in Task_B, but this throws an error within the nested job: Task key does not exist in run: Task_A.

My guess is that the nested workflow in Task_B is unaware of the parent workflow and might be run in a different context, and therefore cannot find taskKey == "Task_A".

To verify my assumption, I tried setting up a (test-only) Notebook that only reads the entity_idswith the get() function.

  • If I add it directly as a task, into the same workflow as Task_A, it works.
  • If I put it into its own workflow, and then reference that as a "Run Job" task, it fails with the aforementioned error.

In both cases, it is always the exact same Notebook.


Solution

  • I tried your approach and can confirm that the behaviour of task variable when used in a "Run Job" Task is as you said.

    Please check if the following alternative would work for you ?

    • In your Task_B (type "Run Job"), create a widget named entity_id

    • In your actual workflow, following Task_A, add a if then else block with a check on the task value. Refer to docs for further information.

      • For testing I tried using {{tasks.Tst_TaskVal_Set.values.entity_id}} != "" and then trigger the Run Job only if it is empty. You can click on Dynamic Values for format and other similar values.
    • The truth flow can be followed by the Run Job task where you can pass the task value {{tasks.Task_A.values.entity_ids}} to the widget as a parameter. I can confirm that this works.

    Alternate Suggestion:

    But considering your number of parameters, see if you can set a flag as task value, write entity_ids to a file in DBFS or cloud and the use the conditionals on the task value flag, and then read the file in the Task_B - Also there may be a limit on the number of characters that the task value/widget can accept.