Search code examples
dagster

Dagster chaining resources


I've recently picked up Dagster to evaluate as an alternate to Airflow.

I haven't been able to wrap my head around the concept of resources and looking to understand if what I'm trying to do is possible or can be achieved better in a different way.

I have a helpder class like below that helps keep code DRY

from dagster import resource, solid, ModeDefinition, pipeline
from dagster_aws.s3 import s3_resource

class HelperAwsS3:
    def __init__(self, s3_resource):
        self.s3_resource = s3_resource

    def s3_list_bucket(self, bucket, prefix):
        return self.s3_resource.list_objects_v2(
            Bucket=bucket,
            Prefix=prefix
        )

    def s3_download_file(self, bucket, file, local_path):
        self.s3_resource.meta.client.download_file(
            Bucket=bucket,
            Key=file,
            Filename=local_path
        )

    def s3_upload_file(self, bucket, file, local_path):
        self.s3_resource.meta.client.upload_file(
            Bucket=bucket,
            Key=file,
            Filename=local_path
        )

The s3_resource is actually dagster_aws.s3.s3_resource which will help me connect to AWS using my local aws credenitals.

I am not sure how to pass the s3_resource to the HelperAwsS3 when I make the call in the @resource section below.

@resource
def connection_helper_aws_s3_resource(context):
    return HelperAwsS3()

Any pointers please? Or am I doing it all wrong and it needs doing in a different way?

Thanks for your help.


Solution

  • I posted the same question on the dagster Slack channel and qickly had a reply frok the helpful team. Posting it here, in case it helps someone -

    keep your HelperAwsS3 class and write your own resource that uses the s3 resource, it could look something like this:

    @resource(required_resource_keys={"s3"})
    def connection_helper_aws_s3_resource(context):
        return HelperAwsS3(s3_resource=context.resources.s3)
    

    (And then be sure to include both the s3 resource and your custom resource on your mode definition:

    @pipeline(mode_defs=[ModeDefinition(
      resource_defs={"s3": s3_resource, "connection_helper_aws_s3": connection_helper_aws_s3_resource}
    )]):
      ...