Search code examples
pythondagster

Add param to dagster input


Hello I have the following @job in dagster

@job
def job_extract_faces():
    faces = op_extract_data(op_get_data_path())
    r = op_process((faces, 'a'))
    r = op_process((faces, 'b'))
    r = op_process((faces, 'c'))
    r = op_process((faces, 'd'))

the proble is that dagster says that the input of the op_process should be the output of op_extrac_data

is there anyway of add a parameter instead of build 4 functions?

thhan


Solution

  • I think you're looking for Dynamic Graphs. Using this pattern you'd emit your parameters as DynamicOutputs from an upstream op and map that output across op_process. One option would be to do something like:

    from dagster import op, job, DynamicOut, DynamicOutput
    
    
    @op(config_schema={"param_list": [str]},
        out=DynamicOut(str))
    def param_generator(context):
        for i, p in enumerate(context.op_config["param_list"]):
            yield DynamicOutput(p, mapping_key=str(i))
    
    @job
    def job_extract_faces():
        faces = op_extract_data(op_get_data_path())
        param_generator().map(lambda p: op_process(faces, p))
    

    This is similar to the mapping example in the docs showing how to do dynamic mapping with additional arguments