I have dagster==1.0.11 and I am trying to materialize a downstream_asset independently from its upstream_asset. I want to do it through the python API. Please consider the code below for reference.
# example.py
from dagster import asset, materialize, repository
def upstream_asset():
return [1, 2, 3]
def downstream_asset(upstream_asset):
return upstream_asset + [4]
def repo():
return [upstream_asset, downstream_asset]
if __name__ == "__main__":
When I call python example.py
, it fails with:
dagster._core.errors.DagsterInvalidDefinitionError: Input asset '["upstream_asset"]' for asset '["downstream_asset"]' is not produced by any of the provided asset ops and is not one of the provided sources
I would like to achieve the same behavior I get from the dagit UI.
In dagit (UI), if I select downstream_asset
and click "Materialize selected", I'll get
FileNotFoundError: [Errno 2] No such file or directory: '/Users/pedro.viana/dev/nu/mock-model/dagster_home/storage/upstream_asset' (full exception in DETAILS section, below)
But, if I first select upstream_asset
, click materialize (it succeeds), delete everything in the DAGSTER_HOME directory but the storage/upstream file (erasing all metadata, but effectively leaving the persisted result of upstream asset there) AND THEN launch dagit again, select downstream_asset and click "Materialize selected", it will succeed. It has no metadata about the upstream_asset
, it simply checks if the file is where upstream_asset
's io_manager tells it should be. It is there and downstream_asset
's materialization succeeds.
How to achieve this behavior with the python API? I would like to call materialize([downstream_asset])
and have it succeed IF it finds the upstream persisted result where it should be.
CONTEXT: I can have multiple instances of dagster running in multiple environments, but all with the same code version (and an s3_io_manager). So, if someone materializes an upstream_asset, it is available to all others running the downstream_asset, even if they do not know who and when the upstream was materialized.
Full exception:
dagster._core.errors.DagsterExecutionLoadInputError: Error occurred while loading input "upstream_asset" of step "downstream_asset":
File "/Users/pedro.viana/miniforge3/envs/dagtest/lib/python3.6/site-packages/dagster/_core/execution/plan/execute_plan.py", line 224, in dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/Users/pedro.viana/miniforge3/envs/dagtest/lib/python3.6/site-packages/dagster/_core/execution/plan/execute_step.py", line 320, in core_dagster_event_sequence_for_step
step_input.source.load_input_object(step_context, input_def)
File "/Users/pedro.viana/miniforge3/envs/dagtest/lib/python3.6/site-packages/dagster/_core/execution/plan/inputs.py", line 201, in load_input_object
yield from _load_input_with_input_manager(loader, load_input_context)
File "/Users/pedro.viana/miniforge3/envs/dagtest/lib/python3.6/site-packages/dagster/_core/execution/plan/inputs.py", line 867, in _load_input_with_input_manager
value = input_manager.load_input(context)
File "/Users/pedro.viana/miniforge3/envs/dagtest/lib/python3.6/contextlib.py", line 99, in __exit__
self.gen.throw(type, value, traceback)
File "/Users/pedro.viana/miniforge3/envs/dagtest/lib/python3.6/site-packages/dagster/_core/execution/plan/utils.py", line 82, in solid_execution_error_boundary
) from e
The above exception was caused by the following exception:
FileNotFoundError: [Errno 2] No such file or directory: '/Users/pedro.viana/dev/nu/mock-model/dagster_home/storage/upstream_asset'
File "/Users/pedro.viana/miniforge3/envs/dagtest/lib/python3.6/site-packages/dagster/_core/execution/plan/utils.py", line 47, in solid_execution_error_boundary
File "/Users/pedro.viana/miniforge3/envs/dagtest/lib/python3.6/site-packages/dagster/_core/execution/plan/inputs.py", line 867, in _load_input_with_input_manager
value = input_manager.load_input(context)
File "/Users/pedro.viana/miniforge3/envs/dagtest/lib/python3.6/site-packages/dagster/_core/storage/fs_io_manager.py", line 181, in load_input
with open(filepath, self.read_mode) as read_obj:
I have found out a way to do it using an experimental feature. I would still like to know if there is a recommended (not experimental) approach.
I can use build_assets_job
to build a job for executing each asset. To build the downstream_job, I need to inform that upstream_asset is a source.
The resulting behavior is exactly the one I expected, just like running from the dagit UI.
Code below, running the script is enough to observe the behavior.
from dagster import asset
from dagster._core.definitions.assets_job import build_assets_job
def upstream_asset():
return [1, 2, 3]
upstream_only = build_assets_job(
def downstream_asset(upstream_asset):
return upstream_asset + [4]
downstream_only = build_assets_job(
if __name__ == "__main__":
import os
import pathlib
job_name = "downstream_only"
path_to_module = pathlib.Path(__file__).resolve()
command = os.system(f"dagster job execute -f {path_to_module} -a {job_name}")```