Search code examples
pythontestingdagster

write test for dagster asset job


I am trying to write a simple test for a dagster job and I can't get it through...

I am using dagster 1.3.6

So I have defined this job using the function dagster.define_asset_job

from dagster import define_asset_job
my_job: UnresolvedAssetJobDefinition = define_asset_job(
    name='name_for_my_job',
    selection=AssetSelection.assets(
        source_asset_1,
        source_asset_2,
        asset_1,
        asset_2
    )
)

Intuitive try

By reading the documentation, I figured that I had to call the execute_in_process method, which is defined in the JobDefinition class.

from my_package import my_job
def test_documentation():
    result = my_job.execute_in_process()
    assert result.success

But like I've highligted in the first code block, my_job is of type UnresolvedAssetJobDefinition. By digging a bit more in the code, I see that there is a resolve method, which returns a JobDefinition.

So I wanted to do that, but I've seen that you can't call resolve without parameter; you are required to provide asset_graph.

But it's exactly what I was trying to avoid. I don't want to provide the list of the assets/source assets, I want them to be deduced from the job definition.

Journey

I've seen that in addition to the UnresolvedAssetJobDefinition.resolve().execute_in_process(), I could look at the materialize_to_memory function; but I faced the same issue: I need to provide a list of assets.

I spent some time trying to get the assets out of the UnresolvedAssetJobDefinition. I've seen that there is a .selection property that allows me to get a KeysAssetSelection, which basically contains a list of AssetKey.

But I need a list of Union[AssetsDefinition, SourceAsset] and I don't know how to convert an AssetKey into an AssetDefinition.

Last try

Hereafter there is my last try, you can see that I am just trying to wire things together, as a admission of my weakness I am not even trying to use the job definition to get the assets.

import pytest
from my_package import my_job, source_asset_1, source_asset_2, asset_1, asset_2
from dagster._core.definitions.asset_graph import AssetGraph

@pytest.fixture
def test_resources() -> Mapping[str, object]:
    return {
        "parquet_io_manager": parquet_io_manager.configured({'data_path': DATA_FOLDER }),
    }



def test_my_job(
    test_resources: Mapping[str, object],
):
    graph = AssetGraph.from_assets([source_asset_1, source_asset_2, asset_1, asset_2])
    job = my_job.resolve(asset_graph=graph)
    result = job.execute_in_process(resources=test_resources)
    assert result.success

but I can't quite get what I want. In the last example, I got this error

dagster._core.errors.DagsterInvalidSubsetError: AssetKey(s) {AssetKey(['source_asset_1']), AssetKey(['source_asset_2']), AssetKey(['asset_1']), AssetKey(['asset_2'])} were selected, but no AssetsDefinition objects supply these keys. Make sure all keys are spelled correctly, and all AssetsDefinitions are correctly added to the Definitions.

Help

I know that I can test each asset by just importing and calling the function decorated by the @asset dagster keyword. But I want to be able to launch all the assets from the job, without having to rewrite this test function.

Do you think that it's something possible? Am I doing something wrong? I must miss something obvious... any help would be appreciated.

Have a nice day!


Solution

  • The object that's produced by define_asset_job does not include object references to the asset definitions selected in the job.

    This means that, to execute an asset job in-process, you need to somehow pass those asset definition object references.

    One way to do this is through a Definitions object:

    from dagster import 
    from my_package import my_module
    
    my_job = define_asset_job("my_job", ...)
    all_assets = load_assets_from_modules([my_module])
    
    defs = Definitions(assets=all_assets, jobs=[my_job])
    result = defs.get_job_def("my_job").execute_in_process()