Hello I have the following @job in dagster
@job
def job_extract_faces():
faces = op_extract_data(op_get_data_path())
r = op_process((faces, 'a'))
r = op_process((faces, 'b'))
r = op_process((faces, 'c'))
r = op_process((faces, 'd'))
the proble is that dagster says that the input of the op_process should be the output of op_extrac_data
is there anyway of add a parameter instead of build 4 functions?
thhan
I think you're looking for Dynamic Graphs. Using this pattern you'd emit your parameters as DynamicOutputs from an upstream op and map that output across op_process
. One option would be to do something like:
from dagster import op, job, DynamicOut, DynamicOutput
@op(config_schema={"param_list": [str]},
out=DynamicOut(str))
def param_generator(context):
for i, p in enumerate(context.op_config["param_list"]):
yield DynamicOutput(p, mapping_key=str(i))
@job
def job_extract_faces():
faces = op_extract_data(op_get_data_path())
param_generator().map(lambda p: op_process(faces, p))
This is similar to the mapping example in the docs showing how to do dynamic mapping with additional arguments