I have a job A which needs to invoke an Azkaban flow "F" as a dependency. How do I mention the dependency of Job A on Flow F?
Here is what I have right now related to getting the remotely stored flow "F":
session = remote.Session("user@https://AZKABANURL")
workflows = session.get_workflows("FlowFProjectName")
flows = workflows[u"flows"]
flow_id = flows[0]["flowId"]
workflows = session.get_workflow_info("FlowFProjectName", flow_id)
node_id = workflows["nodes"][0]["id"]
Now that I have the node_id which is the name of the last job in Flow F, how do I add a dependency of flow F in a job A? Is it like so?
jobs["A"] = {
"type": "command",
"command": 'echo "Hello World"',
"dependencies": "F"
}
Doing the following gives me an error on uploading to Azkaban (by bundling this job A inside a project):
jobs["a"] = Job({"type": "command", "command": 'echo "Hello World"',"dependencies": node_id})
This is the error:
azkaban.util.AzkabanError: Installation Failed.
Error found in upload. Cannot upload.
a cannot find dependency <node_id>
Here, node_id is the actual name of the job which I have obscured.
Could someone advise me on adding these dependencies on external flows inside a job? The external flow is on Azkaban (which is why I have to use Azkaban.remote).
I found the answer(s) to my question:
Option 1: Which is simpler to comprehend - you use a while loop to keep asking Azkaban whether a particular job/flow is still running. But in doing this you shall keep your while loop running for hours and hours + the way you check whether a flow is running is using the get_running_workflows() method. This method does not return whether a certain instance of a flow is still running, rather it only returns whether any instance of said flow is running.
Option 2: If Flow F ends in a job f and a job A is required to run after flow F is done executing, you add a job to the end of flow F, say f' so that f' will call Job A.
If this is hard to understand:
Original job graph: Flow F, which Job A is dependent on: f1 -> f2 -> ... f
After adding the kick-off job: Flow f': f1 -> f2 -> ... f -> f'
And here, f' shall include a session.run_workflow(project_A, flow_A)
This is a better approach than option 1 because you know for sure that Job A will only be kicked off after Flow f has successfully done executing. I hope this helps someone in the future.