Search code examples

How to use repository.load_asset_value from within a running op/asset?

TL;DR: I am trying to build an asset that collects and returns the outputs of multiple other assets, which may vary in number given some parameters, but are all defined at build time.

I tried using the repo.load_asset_value, but I got KeyError: AssetKey(['for_asset_test1']), even though the asset exists in the repository.

Example of stand alone code reproducing the issue.

from dagster import asset, repository, job, define_asset_job
import py_compile

from dagster._core.definitions.reconstruct import repository_def_from_target_def

# Create assets in a for loop
name_suffixes = [f"{x}" for x in [1, 5, 10]]
for_asset_name = "for_asset_test"

def my_asset_factory(
    suffix: str,
        name (str): The name of the new asset.

        function: The new op.

    def for_asset():
        return suffix

    return for_asset

for_assets = [my_asset_factory(suffix) for suffix in name_suffixes]

# Receiving all all assets in for loop
all_for_assets_created = set([f"{for_asset_name}{suffix}" for suffix in name_suffixes])

def handle_for_assets(context):
    """Collect all non_argument_deps outputs into a list and returns the list"""

    all_input_keys = context.op.input_dict.keys()
    cur_repo = repository_def_from_target_def(context.pipeline_def)
    with cur_repo.get_asset_value_loader() as loader:
        assets_results = [
            loader.load_asset_value(asset_key) for asset_key in all_input_keys

    return assets_results

all_assets_job = define_asset_job(name="all_assets_job")

def repos():
    return [handle_for_assets, all_assets_job] + for_assets

# Execute with:
# dagster job execute -f src/mock_model/ --job all_assets_job

I am trying it this way because, since name_suffixes could be changed at will, it would normally require that each of them would be written as arguments for handle_for_assets, which is rather unpractical. Please let me know if there is a better way of achieving the desired result


  • To build an asset that collects and returns the outputs of multiple other assets, which may vary in number given some parameters, but are all defined at build time, I would use kwargs with AssetIns:

    upstream_assets = []
            "__".join(asset_def.key.path): AssetIn(key=asset_def.key)
            for asset_def in upstream_assets
    def handle_for_assets(**kwargs):
        result = []
        for input_name, asset_value in kwargs.items():
        return result