Search code examples
pythondagster

How to provide parameters defined at the source asset declaration to the IO Manager?


So my initial task was to create an IO Manager that should connect to a database and return data as pandas dataframe.

(I am using dagster 1.3.10)

Design

IMO, the credentials (ip, port, user, password) must be parameters of the IO manager because I want different resources for different credentials. But the other interesting parameters that can be used to perform a database query (select fields, optional filters, sorting, limit, ...) should be linked to an asset definition.

I had no trouble creating the credentials parameter, like this:

@io_manager(
    config_schema={
        'ip': Field(str, is_required=True),
        'port': Field(int, default_value=5432),
        'username': Field(str, is_required=True),
        'password': Field(str, is_required=True)
    }
)
def database_io_manager(init_context):
    return DatabaseIOManager(
        ip=init_context.resource_config.get('ip'),
        port=init_context.resource_config.get('port'),
        username=init_context.resource_config.get('username'),
        password=init_context.resource_config.get('password'),
    )

Then I can just provide this function in the resources dict that I provide to definitions

defs = Definitions(resources={'database_io_manager': database_io_manager})

So now I can use this IO manager in my assets definitions

@asset(io_manager_key='database_io_manager')
def my_asset():
    pass

Now like I said, I want the query parameters to be at the asset level, so I've created a configuration.

from dagster import Config
import pydantic
class DatabaseConfig(Config):
    fields: List[str] = pydantic.Field()

I provide this configuration to the asset in the metadata attribute.

asset(io_manager_key='database_io_manager',metadata={'io_manager': DatabaseConfig(fields='*')})
def my_asset():
    pass

And I can use this in my IO manager with a custom method

 def load_metadata(self, context: Union[InputContext, OutputContext]) -> None:
        config: DatabaseConfig = context.metadata.get("io_manager")
        if not isinstance(config, DatabaseConfig):
            raise ValueError('wrong config type')
        self.fields = config.fields

Problem

This work with Asset but not with SourceAsset.

If I define a source asset like this:

my_source_asset = SourceAsset(
    key='my_source_asset',
    io_manager_key='database_io_manager',
    metadata=DatabaseConfig(fields='*')
)

I can see the metadata associated with this source asset in dagit, but when effectively loading the asset, the metadata dict is empty.

Is it a bug? Am I missing something?

Other (minor) problems

unserializable

I tried to provide a minimal replication example and in the process of doing so I encountered other issues.

The first that bugs me is that this DatabaseConfig object is not displayed by dagit. It says 'unserializable'. But I am extending the Config class and I've tested to call the json() method on it and it works well.

Bonus 1: What can I do to make the DatabaseConfig class serializable as dagit wants it?

zero io manager use

With the code that can be found at the end of this question, when I look in dagit I have zero use of my io managers. zero uses

Bonus 2: Why can't I see the IO managers uses ?


# minimal_example_bug_dagster.py
from __future__ import annotations
import pickle
from typing import Union
import pydantic

from dagster import (
    Config,
    Definitions,
    InputContext,
    OutputContext,
    SourceAsset,
    asset,
    IOManager,
    fs_io_manager,
    io_manager,
)


class CustomIOConfig(Config):
    custom_file_name: str = pydantic.Field()


class CustomIOManager(IOManager):
    my_attribute: str = None

    def get_key(self, context: Union[InputContext, OutputContext]) -> str:
        return context.asset_key.path[:-1]

    def load_metadata(self, context: Union[InputContext, OutputContext]) -> None:
        context.log.info(context.metadata)
        config: CustomIOConfig = context.metadata.get("io_manager")
        self.my_attribute = config.custom_file_name

    def load_input(self, context: InputContext) -> str:
        context.log.info(f"Inside load_input for {self.get_key(context)}")
        self.load_metadata(context)
        pickle.load(open(self.my_attribute, "rb"))

    def handle_output(self, context: "OutputContext", obj: str) -> None:
        context.log.info(f"Inside handle_output for {self.get_key(context)}")
        self.load_metadata(context)
        pickle.dump(obj, open(self.my_attribute, "wb"))


@asset(
    metadata={"io_manager": CustomIOConfig(custom_file_name="foo")},
    io_manager_key="custom_io_manager",
)
def my_asset():
    return "Hello"


my_source_asset = SourceAsset(
    "my_source_asset",
    metadata={"io_manager": CustomIOConfig(custom_file_name="bar")},
    io_manager_key="custom_io_manager",
)


@asset(io_manager_key="fs_io_manager")
def using_both_assets(my_asset, my_source_asset):
    return f"{my_asset}, {my_source_asset}"


@io_manager
def custom_io_manager(init_context):
    return CustomIOManager()


defs = Definitions(
    assets=[my_asset, my_source_asset, using_both_assets],
    resources={"fs_io_manager": fs_io_manager, "custom_io_manager": custom_io_manager},
)

Solution

  • The key was to use the context property upstream_output to retrieve the metadata.

    class CustomIOConfig(Config):
        custom_file_name: str = pydantic.Field()
    
    class CustomIOManager(IOManager):
        my_attribute: str = None
    
        def load_metadata(self, context: Union[InputContext, OutputContext]) -> None:
            config: CustomIOConfig = context.upstream_output.metadata.get("io_manager")
            self.my_attribute = config.custom_file_name
        
    @asset(
        metadata={"io_manager": CustomIOConfig(custom_file_name="foo")},
        io_manager_key="custom_io_manager",
    )
    def my_asset():
        return "Hello"