I've setup a sensor in Dagster:
@sensor(job=excel_to_csv)
def convert_unit_list(ctx: SensorEvaluationContext):
env = os.getenv("ENVIRONMENT")
region = os.getenv("AWS_REGION")
appcfg = AppConfig("unit-ingestion", "developer-units", env, region)
bucket = Bucket.LANDING
bucket_name = bucket.format(env)
prefix = "unit-ingestion/developer-units"
ctx.log.info(f"Searching {bucket_name}/{prefix} for new files...")
since_key = ctx.cursor
keys = get_s3_keys(bucket_name, prefix, since_key)
if not keys:
return SkipReason(f"No keys found in {bucket_name}/{prefix}")
run_config = {
"s3": S3Resource(region_name=region),
"conv_ctx": ExcelConversionContext(appcfg, bucket, "unit-ingestion/units", 1, "B:BV"),
}
ctx.log.info(f"Found {len(keys)} new keys in {bucket_name}/{prefix}")
for key in keys:
yield RunRequest(key, run_config=run_config)
ctx.log.info(f"Updating cursor to {key}")
ctx.update_cursor(key)
Now, I'd like to write an additional sensor that runs whenever excel_to_csv
fails. I've got it stubbed out already:
@run_failure_sensor(request_job=excel_to_csv)
def excel_to_csv_failure(ctx: RunFailureSensorContext):
ctx.log.error(f"{ctx.sensor_name} failed to process {ctx.failure_event.asset_key}")
However, I'd like to get run_config
so I can use it to moved the failed Excel file to rejected S3 bucket. I've looked through ctx
and it looks like ctx.dagster_event.step_input_data
could be what I want here, but I'm not sure as the documentation doesn't include all the fields for this type. Does anyone know whether this is possible and, if it is, where I can find this data?
The RunFailureSensorContext object has a DagsterRun member, which points to the run that failed.
@run_failure_sensor(request_job=excel_to_csv)
def excel_to_csv_failure(ctx: RunFailureSensorContext):
ctx.log.error(f"{ctx.sensor_name} failed to process {ctx.failure_event.asset_key}")
failed_run_config = ctx.dagster_run.run_config
# do stuff with run_config