Search code examples
pythoncondor

Can a second condor_submit_dag be executed by a node within the first DAG?


I am using Condor to perform a large number of processing tasks in a distributed way. There are two processing stages. In the first processing stage, I execute a tool UMPTEEN times to parse some subset of the source data and convert that to an intermediary file. Each execution of the tool is independent of all the others. So, that lends itself well to using to Condor.

The catch is that the tool may decide not to output any intermediary file. Thus, I cannot know a priori how many intermediary files I will have; the number may be less than UMPTEEN. Another catch is that I am agnostic about what the name of the intermediary file will be; I only know the filename after it has been created by the tool.

In the second processing stage, I execute other tools to convert each intermediary file to other destination files with different formats. I would like to use Condor for that also. But, to write a submit description file for that requires that I know exactly how many intermediary files I have to convert and what their filenames are.

What I attempted is to have a "generate_stage2" node in my stage1 DAG that depends on completion of the first node. In the "generate_stage2" node, I run a Python script that:

  • searches for the intermediary files;
  • writes submit description files that will convert those intermediary files to the destination formats;
  • calls condor_submit_dag to perform that second DAG.

But, submitting the second DAG fails. I suspect that Condor does not like it when I call condor_submit_dag within a node that is currently running in the first DAG.

The big question

Is what I am attempting possible? Is there a way for one DAG to trigger another DAG?

Example

Following are examples of my submit description files, which hopefully explain what I have attempted.

Stage1 DAG

JOB 10_src_to_int      work/condor/10_src_to_int
JOB 20_generate_stage2 work/condor/20_generate_stage2

PARENT 10_src_to_int CHILD 20_generate_stage2

10_src_to_int

getenv = true
notification = Never
universe = vanilla
run_as_owner = true
initialdir = /foo/somewhere
executable = /bin/src_to_int

# UMPTEEN entries:
arguments = "src_data/ int_data/ --region -45 -123 -44 -122"
queue
arguments = "src_data/ int_data/ --region -46 -123 -45 -122"
queue
...

20_generate_stage2

getenv = true
notification = Never
universe = vanilla
run_as_owner = true
initialdir = /foo/somewhere
executable = /scripts/generate_stage2

arguments = "'data to share'  'between stage1'  'and stage2'"
queue

Stage2 DAG

JOB 30_int_to_dst_a work/condor/30_int_to_abc
JOB 40_int_to_dst_b work/condor/40_int_to_xyz

30_int_to_abc

# Written by the generate_stage2 script which a node in the stage1 DAG executed.
getenv = true
notification = Never
universe = vanilla
run_as_owner = true
initialdir = /foo/somewhere
executable = /bin/int_to_abc

# At most UMPTEEN entries:
arguments = "int_data/S45_W123.int out_data/S45_W123.abc"
queue
arguments = "int_data/S46_W123.int out_data/S46_W123.abc"
queue
...

40_int_to_xyz

# Written by the generate_stage2 script which a node in the stage1 DAG executed.
getenv = true
notification = Never
universe = vanilla
run_as_owner = true
initialdir = /foo/somewhere
executable = /bin/int_to_xyz

# At most UMPTEEN entries:
arguments = "int_data/S45_W123.int out_data/S45_W123.xyz"
queue
arguments = "int_data/S46_W123.int out_data/S46_W123.xyz"
queue
...

(Yes, I subset the source data into geospatial regions. In the examples I used arbitrary coordinates around 45° S 123° W, which is in the middle of the ocean. There is no significance to it.)


Solution

  • I have learned how to accomplish what I want with one DAG.

    It never occurred to me before that I do not have to write the submit files for all of the nodes in the DAG at the time I submit the DAG. As long as the submit file for a node is written before the node runs, it will work.

    What I do now is let the first node execute the tool to generate the intermediate files. Then, the second node executes a Python script which searches for the intermediate files and writes submit files for the third and fourth nodes. Finally, the third and fourth nodes run successfully.

    Example

    Following is an example of my modified submit description files.

    DAG

    JOB 10_src_to_int work/condor/10_src_to_int
    JOB 20_find_int   work/condor/20_find_int
    JOB 30_int_to_abc work/condor/30_int_to_abc
    JOB 40_int_to_xyz work/condor/40_int_to_xyz
    
    PARENT 10_src_to_int CHILD 20_find_int
    PARENT 20_find_int CHILD 30_int_to_abc
    PARENT 20_find_int CHILD 40_int_to_xyz
    

    10_src_to_int

    getenv = true
    notification = Never
    universe = vanilla
    run_as_owner = true
    initialdir = /foo/somewhere
    executable = /bin/src_to_int
    
    # UMPTEEN entries:
    arguments = "src_data/ int_data/ --region -45 -123 -44 -122"
    queue
    arguments = "src_data/ int_data/ --region -46 -123 -45 -122"
    queue
    ...
    

    20_find_int

    getenv = true
    notification = Never
    universe = vanilla
    run_as_owner = true
    initialdir = /foo/somewhere
    executable = /scripts/find_int
    
    arguments = "'data needed'  'to find'  'intermediate files'"
    queue
    

    30_int_to_abc

    # Written by the find_int script which the previous node executed.
    getenv = true
    notification = Never
    universe = vanilla
    run_as_owner = true
    initialdir = /foo/somewhere
    executable = /bin/int_to_abc
    
    # At most UMPTEEN entries:
    arguments = "int_data/S45_W123.int out_data/S45_W123.abc"
    queue
    arguments = "int_data/S46_W123.int out_data/S46_W123.abc"
    queue
    ...
    

    40_int_to_xyz

    # Written by the find_int script which the previous node executed.
    getenv = true
    notification = Never
    universe = vanilla
    run_as_owner = true
    initialdir = /foo/somewhere
    executable = /bin/int_to_xyz
    
    # At most UMPTEEN entries:
    arguments = "int_data/S45_W123.int out_data/S45_W123.xyz"
    queue
    arguments = "int_data/S46_W123.int out_data/S46_W123.xyz"
    queue
    ...