TL;DR: I am trying to build an asset that collects and returns the outputs of multiple other assets, which may vary in number given some parameters, but are all defined at build time.
I tried using the repo.load_asset_value, but I got KeyError: AssetKey(['for_asset_test1'])
, even though the asset exists in the repository.
Example of stand alone code reproducing the issue.
# temp.py
from dagster import asset, repository, job, define_asset_job
import py_compile
from dagster._core.definitions.reconstruct import repository_def_from_target_def
# Create assets in a for loop
name_suffixes = [f"{x}" for x in [1, 5, 10]]
for_asset_name = "for_asset_test"
def my_asset_factory(
suffix: str,
name=for_asset_name,
):
"""
Args:
name (str): The name of the new asset.
Returns:
function: The new op.
"""
@asset(name=f"{name}{suffix}")
def for_asset():
return suffix
return for_asset
for_assets = [my_asset_factory(suffix) for suffix in name_suffixes]
# Receiving all all assets in for loop
all_for_assets_created = set([f"{for_asset_name}{suffix}" for suffix in name_suffixes])
@asset(non_argument_deps=all_for_assets_created)
def handle_for_assets(context):
"""Collect all non_argument_deps outputs into a list and returns the list"""
all_input_keys = context.op.input_dict.keys()
cur_repo = repository_def_from_target_def(context.pipeline_def)
with cur_repo.get_asset_value_loader() as loader:
assets_results = [
loader.load_asset_value(asset_key) for asset_key in all_input_keys
]
return assets_results
all_assets_job = define_asset_job(name="all_assets_job")
@repository
def repos():
return [handle_for_assets, all_assets_job] + for_assets
# Execute with:
# dagster job execute -f src/mock_model/temp.py --job all_assets_job
I am trying it this way because, since name_suffixes
could be changed at will, it would normally require that each of them would be written as arguments for handle_for_assets
, which is rather unpractical. Please let me know if there is a better way of achieving the desired result
To build an asset that collects and returns the outputs of multiple other assets, which may vary in number given some parameters, but are all defined at build time, I would use kwargs
with AssetIn
s:
upstream_assets = []
@asset(
ins={
"__".join(asset_def.key.path): AssetIn(key=asset_def.key)
for asset_def in upstream_assets
}
)
def handle_for_assets(**kwargs):
result = []
for input_name, asset_value in kwargs.items():
result.append(asset_value)
return result