Search code examples
dagster

Testing a dagster pipeline


Summary: Dagster run configurations for Dagit vs. PyTest appear to be incompatible for my project

I've been getting errors trying to run pytest on a pipeline and I'd really appreciate any pointers. I've consistently gotten errors of the form:

dagster.core.errors.DagsterInvalidConfigError: 
Error in config for pipeline ephemeral_write_myfunc_to_redis_solid_pipeline
Error 1: Undefined field "myfunc_df_to_list" at path root:solids. 
Expected: "{ myfunc_list?: { outputs?: [{ result?: { json: { path: String } pickle: { path: String } } }] } 
write_myfunc_to_redis?:..."

A few notes about the project:

  • dagster, version 0.9.15
  • my pipeline runs in Dagit without errors for the same configuration
  • the unit tests run for the individual solids that comprise the pipeline

Failed solutions: I've tried populating the configuration files with solids that define the outputs as each pytest error has recommended, but they all have led to errors more opaque than the one before it.

My solids are:

@solid(required_resource_keys={"db"})
def get_myfunc_df(context, query: String) -> myfuncDF:
    do something
    return myfuncDF

@solid
def myfunc_df_to_list(context, df: myfuncDF) -> List:
    do something
    return List

@solid(required_resource_keys={"redis"})
def write_myfunc_to_redis(context, myfunc_list:List) -> None:
    write to redis return None

And my pipeline is a chain of these solids

@pipeline(
    mode_defs=filter_modes(MODES),
    preset_defs=filter_presets(PRESETS),
    tags={"type": "myproject"},
)
def myfunc_to_redis_pipeline():
    df = get_myfunc_df()
    myfunc_list = myfunc_df_to_list(df)
    write_myfunc_to_redis(myfunc_list)

My test code in test_main.py is

    @pytest.mark.myfunc
    def test_myfunc_to_redis_pipeline(self):
        res = execute_pipeline(myfunc_to_redis_pipeline,
                               preset="test",)
        assert res.success
        assert len(res.solid_result_list) == 4
        for solid_res in res.solid_result_list:
            assert solid_res.success

Where the preset "test" is defined with the run configuration in a yaml file:

resources:
  db:
    config:
      file_path: test.csv

^ This is where it's throwing the most errors and I've been iterating through different permutations of solids to add ala:

solids:
  get_myfunc_df:
    inputs:
      query:
        value: select 1

but it hasn't solved the problem yet. Is there any reason the Solids for a test would need their output defined despite the fact that while running in Dagit, only the input solid needs to have a definition?

Is this error indicative of something else being amiss?

edit: Here is the stack trace from tox --verbose

self = <repos.myfunc.myfunc.dagster.tests.test_main.Test_myfunc testMethod=test_myfunc_df>

    @pytest.mark.myfunc
    def test_myfunc_df(self):
        """myfunc"""
        result = execute_solid(
            get_myfunc_df,
            mode_def=test_mode,
            run_config=run_config,
>           input_values={"query": "SELECT 1"},
        )

repos/myfunc/myfunc/dagster/tests/test_main.py:29:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/utils/test/__init__.py:324: in execute_solid
    raise_on_error=raise_on_error,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/execution/api.py:335: in execute_pipeline
    raise_on_error=raise_on_error,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/telemetry.py:90: in wrap
    result = f(*args, **kwargs)
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/execution/api.py:375: in _logged_execute_pipeline
    tags=tags,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/instance/__init__.py:586: in create_run_for_pipeline
    pipeline_def, run_config=run_config, mode=mode,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/execution/api.py:644: in create_execution_plan
    environment_config = EnvironmentConfig.build(pipeline_def, run_config, mode=mode)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

pipeline_def = <dagster.core.definitions.pipeline.PipelineDefinition object at 0x1359f6210>
run_config = {'resources': {'ge_data_context': {'config': {'ge_root_dir': '/Users/this_user/Workspace/drizly-dagster/repos/datas...cause_you_bought/dagster/tests/test.csv'}}}, 'solids': {'get_myfunc_df': {'inputs': {'query': {'value': 'select 1'}}}}}
mode = 'test'

    @staticmethod
    def build(pipeline_def, run_config=None, mode=None):
        """This method validates a given run config against the pipeline config schema. If
        successful, we instantiate an EnvironmentConfig object.

        In case the run_config is invalid, this method raises a DagsterInvalidConfigError
        """
        from dagster.config.validate import process_config
        from dagster.core.definitions.executor import ExecutorDefinition
        from dagster.core.definitions.intermediate_storage import IntermediateStorageDefinition
        from dagster.core.definitions.system_storage import SystemStorageDefinition
        from .composite_descent import composite_descent

        check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition)
        run_config = check.opt_dict_param(run_config, "run_config")
        check.opt_str_param(mode, "mode")

        mode = mode or pipeline_def.get_default_mode_name()
        environment_type = create_environment_type(pipeline_def, mode)

        config_evr = process_config(environment_type, run_config)
        if not config_evr.success:
            raise DagsterInvalidConfigError(
                "Error in config for pipeline {}".format(pipeline_def.name),
                config_evr.errors,
>               run_config,
            )
E           dagster.core.errors.DagsterInvalidConfigError: Error in config for pipeline ephemeral_get_myfunc_df_solid_pipeline
E               Error 1: Undefined field "inputs" at path root:solids:get_myfunc_df. Expected: "{ outputs?: [{ result?: { csv: { path: (String | { env: String }) sep?: (String | { env: String }) } parquet: { path: (String | { env: String }) } pickle: { path: (String | { env: String }) } table: { path: (String | { env: String }) } } }] }".

.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/system_config/objects.py:101: DagsterInvalidConfigError
_______________________________________________________________________ Test_myfunc.test_write_myfunc_to_redis ________________________________________________________________________

self = <repos.myfunc.myfunc.dagster.tests.test_main.Test_myfunc testMethod=test_write_myfunc_to_redis>

    @pytest.mark.myfunc
    def test_write_myfunc_to_redis(self):
        """Test redis write"""
        records = [
            ("k", "v"),
            ("k2", "v2"),
        ]
        result = execute_solid(
            write_myfunc_to_redis,
            mode_def=test_mode,
            input_values={"myfunc_list": records},
>           run_config=run_config,
        )

repos/myfunc/myfunc/dagster/tests/test_main.py:56:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/utils/test/__init__.py:324: in execute_solid
    raise_on_error=raise_on_error,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/execution/api.py:335: in execute_pipeline
    raise_on_error=raise_on_error,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/telemetry.py:90: in wrap
    result = f(*args, **kwargs)
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/execution/api.py:375: in _logged_execute_pipeline
    tags=tags,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/instance/__init__.py:586: in create_run_for_pipeline
    pipeline_def, run_config=run_config, mode=mode,
.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/execution/api.py:644: in create_execution_plan
    environment_config = EnvironmentConfig.build(pipeline_def, run_config, mode=mode)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

pipeline_def = <dagster.core.definitions.pipeline.PipelineDefinition object at 0x135d39490>
run_config = {'resources': {'ge_data_context': {'config': {'ge_root_dir': '/Users/this_user/Workspace/drizly-dagster/repos/datas...cause_you_bought/dagster/tests/test.csv'}}}, 'solids': {'get_myfunc_df': {'inputs': {'query': {'value': 'select 1'}}}}}
mode = 'test'

    @staticmethod
    def build(pipeline_def, run_config=None, mode=None):
        """This method validates a given run config against the pipeline config schema. If
        successful, we instantiate an EnvironmentConfig object.

        In case the run_config is invalid, this method raises a DagsterInvalidConfigError
        """
        from dagster.config.validate import process_config
        from dagster.core.definitions.executor import ExecutorDefinition
        from dagster.core.definitions.intermediate_storage import IntermediateStorageDefinition
        from dagster.core.definitions.system_storage import SystemStorageDefinition
        from .composite_descent import composite_descent

        check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition)
        run_config = check.opt_dict_param(run_config, "run_config")
        check.opt_str_param(mode, "mode")

        mode = mode or pipeline_def.get_default_mode_name()
        environment_type = create_environment_type(pipeline_def, mode)

        config_evr = process_config(environment_type, run_config)
        if not config_evr.success:
            raise DagsterInvalidConfigError(
                "Error in config for pipeline {}".format(pipeline_def.name),
                config_evr.errors,
>               run_config,
            )
E           dagster.core.errors.DagsterInvalidConfigError: Error in config for pipeline ephemeral_write_myfunc_to_redis_solid_pipeline
E               Error 1: Undefined field "get_myfunc_df" at path root:solids. Expected: "{ myfunc_list?: { outputs?: [{ result?: { json: { path: String } pickle: { path: String } } }] } write_myfunc_to_redis?: { outputs?: [{ result?: { json: { path: String } pickle: { path: String } } }] } }".

.tox/repo-myfunc/lib/python3.7/site-packages/dagster/core/system_config/objects.py:101: DagsterInvalidConfigError
=============================================================================== short test summary info ===============================================================================
FAILED repos/myfunc/myfunc/dagster/tests/test_main.py::Test_myfunc::test_myfunc_df - dagster.core.errors.DagsterInvalidConfigError: Error in config for pipeli...
FAILED repos/myfunc/myfunc/dagster/tests/test_main.py::Test_myfunc::test_write_myfunc_to_redis - dagster.core.errors.DagsterInvalidConfigError: Error in conf

Solution below works The key issue was that the pipeline required solids to be defined in the config as written and the solids were being passed both that same config and input_values in their test function. My change was to remove "input_values" as an argument and pass them via the run configuration. Since my interstitial solids require more complex objects and my configuration file is yaml, I made the following addition to all of my solid tests:

        this_solid_run_config = copy.deepcopy(run_config)
        input_dict = {"df": pd.DataFrame(['1', '2'], columns = ['key', 'value'])}
        this_solid_run_config.update({"solids":
                                  {"myfunc_df_to_list":
                                       {"inputs":input_dict
                                                  }
                                   }
                              }
                             )

Solution

  • Based on the stack trace, the failure is coming from this:

    result = execute_solid(
                get_myfunc_df,
                mode_def=test_mode,
                run_config=run_config,
                input_values={"query": "SELECT 1"},
            )
    

    The solid input "query" should be passed from either "input_values" param or "run_config" param but not both. Happy to keep digging if that doesn't resolve your issue.