Search code examples
python-2.7azkaban

How to add a flow as a dependency for an Azkaban job using the Azkaban Python API?


I have a job A which needs to invoke an Azkaban flow "F" as a dependency. How do I mention the dependency of Job A on Flow F?

Here is what I have right now related to getting the remotely stored flow "F":

session = remote.Session("user@https://AZKABANURL")
workflows = session.get_workflows("FlowFProjectName")
flows = workflows[u"flows"]
flow_id = flows[0]["flowId"]

workflows = session.get_workflow_info("FlowFProjectName", flow_id)
node_id = workflows["nodes"][0]["id"]

Now that I have the node_id which is the name of the last job in Flow F, how do I add a dependency of flow F in a job A? Is it like so?

jobs["A"] = {
    "type": "command", 
    "command": 'echo "Hello World"', 
    "dependencies": "F"
}

Doing the following gives me an error on uploading to Azkaban (by bundling this job A inside a project):

jobs["a"] = Job({"type": "command", "command": 'echo "Hello World"',"dependencies": node_id})

This is the error:

azkaban.util.AzkabanError: Installation Failed.
Error found in upload. Cannot upload.
a cannot find dependency <node_id>

Here, node_id is the actual name of the job which I have obscured.

Could someone advise me on adding these dependencies on external flows inside a job? The external flow is on Azkaban (which is why I have to use Azkaban.remote).


Solution

  • I found the answer(s) to my question:

    1. Call a remote flow and wait for it to finish (in a while loop)
    2. Use a kick-off which makes flow F call it's dependent job.

    Option 1: Which is simpler to comprehend - you use a while loop to keep asking Azkaban whether a particular job/flow is still running. But in doing this you shall keep your while loop running for hours and hours + the way you check whether a flow is running is using the get_running_workflows() method. This method does not return whether a certain instance of a flow is still running, rather it only returns whether any instance of said flow is running.

    Option 2: If Flow F ends in a job f and a job A is required to run after flow F is done executing, you add a job to the end of flow F, say f' so that f' will call Job A.

    If this is hard to understand:

    Original job graph: Flow F, which Job A is dependent on: f1 -> f2 -> ... f

    After adding the kick-off job: Flow f': f1 -> f2 -> ... f -> f'

    And here, f' shall include a session.run_workflow(project_A, flow_A)

    This is a better approach than option 1 because you know for sure that Job A will only be kicked off after Flow f has successfully done executing. I hope this helps someone in the future.