Search code examples
pythondagster

How to use repository.load_asset_value from within a running op/asset?


TL;DR: I am trying to build an asset that collects and returns the outputs of multiple other assets, which may vary in number given some parameters, but are all defined at build time.

I tried using the repo.load_asset_value, but I got KeyError: AssetKey(['for_asset_test1']), even though the asset exists in the repository.

Example of stand alone code reproducing the issue.

# temp.py
from dagster import asset, repository, job, define_asset_job
import py_compile


from dagster._core.definitions.reconstruct import repository_def_from_target_def

# Create assets in a for loop
name_suffixes = [f"{x}" for x in [1, 5, 10]]
for_asset_name = "for_asset_test"


def my_asset_factory(
    suffix: str,
    name=for_asset_name,
):
    """
    Args:
        name (str): The name of the new asset.

    Returns:
        function: The new op.
    """

    @asset(name=f"{name}{suffix}")
    def for_asset():
        return suffix

    return for_asset


for_assets = [my_asset_factory(suffix) for suffix in name_suffixes]

# Receiving all all assets in for loop
all_for_assets_created = set([f"{for_asset_name}{suffix}" for suffix in name_suffixes])


@asset(non_argument_deps=all_for_assets_created)
def handle_for_assets(context):
    """Collect all non_argument_deps outputs into a list and returns the list"""

    all_input_keys = context.op.input_dict.keys()
    cur_repo = repository_def_from_target_def(context.pipeline_def)
    with cur_repo.get_asset_value_loader() as loader:
        assets_results = [
            loader.load_asset_value(asset_key) for asset_key in all_input_keys
        ]

    return assets_results


all_assets_job = define_asset_job(name="all_assets_job")


@repository
def repos():
    return [handle_for_assets, all_assets_job] + for_assets


# Execute with:
# dagster job execute -f src/mock_model/temp.py --job all_assets_job

I am trying it this way because, since name_suffixes could be changed at will, it would normally require that each of them would be written as arguments for handle_for_assets, which is rather unpractical. Please let me know if there is a better way of achieving the desired result


Solution

  • To build an asset that collects and returns the outputs of multiple other assets, which may vary in number given some parameters, but are all defined at build time, I would use kwargs with AssetIns:

    upstream_assets = []
    
    @asset(
        ins={
            "__".join(asset_def.key.path): AssetIn(key=asset_def.key)
            for asset_def in upstream_assets
        }
    )
    def handle_for_assets(**kwargs):
        result = []
    
        for input_name, asset_value in kwargs.items():
            result.append(asset_value)
    
        return result