I would like to use the new "Job as Task" feature (as mentioned in this SO answer) but I'm having trouble to pass values into that job.
Scenario
entity_ids
. This workflow runs totally fine on its own if started manually.For this scenario I would like to add some logic that decides how much of the workflow will be executed:
If Task_A finds specific information in its table, it should start the workflow in Task_B and provide it with a couple of parameters based on that information (in this example: a list of entity_ids
). If that information is not found, the workflow should end gracefully and wait for the next interval.
My question: How do I pass (multiple) values into the job that is referenced in Task_B?
I had tried to set this with dbutils.jobs.taskValues.set("entity_id", "[1, 2]")
in Task_A and read with dbutils.jobs.taskValues.get("Task_A", "entity_ids", debugValue="[]" )
in the first Notebook of the workflow in Task_B, but this throws an error within the nested job:
Task key does not exist in run: Task_A.
My guess is that the nested workflow in Task_B is unaware of the parent workflow and might be run in a different context, and therefore cannot find taskKey == "Task_A"
.
To verify my assumption, I tried setting up a (test-only) Notebook that only reads the entity_ids
with the get()
function.
In both cases, it is always the exact same Notebook.
I tried your approach and can confirm that the behaviour of task variable when used in a "Run Job" Task is as you said.
Please check if the following alternative would work for you ?
In your Task_B
(type "Run Job"), create a widget named entity_id
In your actual workflow, following Task_A
, add a if then else
block with a check on the task value. Refer to docs for further information.
The truth flow can be followed by the Run Job
task where you can pass the task value {{tasks.Task_A.values.entity_ids}}
to the widget as a parameter. I can confirm that this works.
Alternate Suggestion:
But considering your number of parameters, see if you can set a flag as task value, write entity_ids to a file in DBFS or cloud and the use the conditionals on the task value flag, and then read the file in the Task_B - Also there may be a limit on the number of characters that the task value/widget can accept.