I've recently picked up Dagster to evaluate as an alternate to Airflow.
I haven't been able to wrap my head around the concept of resources and looking to understand if what I'm trying to do is possible or can be achieved better in a different way.
I have a helpder class like below that helps keep code DRY
from dagster import resource, solid, ModeDefinition, pipeline
from dagster_aws.s3 import s3_resource
class HelperAwsS3:
def __init__(self, s3_resource):
self.s3_resource = s3_resource
def s3_list_bucket(self, bucket, prefix):
return self.s3_resource.list_objects_v2(
Bucket=bucket,
Prefix=prefix
)
def s3_download_file(self, bucket, file, local_path):
self.s3_resource.meta.client.download_file(
Bucket=bucket,
Key=file,
Filename=local_path
)
def s3_upload_file(self, bucket, file, local_path):
self.s3_resource.meta.client.upload_file(
Bucket=bucket,
Key=file,
Filename=local_path
)
The s3_resource is actually dagster_aws.s3.s3_resource which will help me connect to AWS using my local aws credenitals.
I am not sure how to pass the s3_resource to the HelperAwsS3 when I make the call in the @resource section below.
@resource
def connection_helper_aws_s3_resource(context):
return HelperAwsS3()
Any pointers please? Or am I doing it all wrong and it needs doing in a different way?
Thanks for your help.
I posted the same question on the dagster Slack channel and qickly had a reply frok the helpful team. Posting it here, in case it helps someone -
keep your HelperAwsS3 class and write your own resource that uses the s3 resource, it could look something like this:
@resource(required_resource_keys={"s3"})
def connection_helper_aws_s3_resource(context):
return HelperAwsS3(s3_resource=context.resources.s3)
(And then be sure to include both the s3 resource and your custom resource on your mode definition:
@pipeline(mode_defs=[ModeDefinition(
resource_defs={"s3": s3_resource, "connection_helper_aws_s3": connection_helper_aws_s3_resource}
)]):
...