Search code examples
pythondagster

Get run config from Dagster RunFailureSensorContext


I've setup a sensor in Dagster:

@sensor(job=excel_to_csv)
def convert_unit_list(ctx: SensorEvaluationContext):

    env = os.getenv("ENVIRONMENT")
    region = os.getenv("AWS_REGION")
    appcfg = AppConfig("unit-ingestion", "developer-units", env, region)

    bucket = Bucket.LANDING
    bucket_name = bucket.format(env)
    prefix = "unit-ingestion/developer-units"
    ctx.log.info(f"Searching {bucket_name}/{prefix} for new files...")

    since_key = ctx.cursor
    keys = get_s3_keys(bucket_name, prefix, since_key)
    if not keys:
        return SkipReason(f"No keys found in {bucket_name}/{prefix}")

    run_config = {
        "s3": S3Resource(region_name=region),
        "conv_ctx": ExcelConversionContext(appcfg, bucket, "unit-ingestion/units", 1, "B:BV"),
    }

    ctx.log.info(f"Found {len(keys)} new keys in {bucket_name}/{prefix}")
    for key in keys:

        yield RunRequest(key, run_config=run_config)

        ctx.log.info(f"Updating cursor to {key}")
        ctx.update_cursor(key)

Now, I'd like to write an additional sensor that runs whenever excel_to_csv fails. I've got it stubbed out already:

@run_failure_sensor(request_job=excel_to_csv)
def excel_to_csv_failure(ctx: RunFailureSensorContext):

    ctx.log.error(f"{ctx.sensor_name} failed to process {ctx.failure_event.asset_key}")

However, I'd like to get run_config so I can use it to moved the failed Excel file to rejected S3 bucket. I've looked through ctx and it looks like ctx.dagster_event.step_input_data could be what I want here, but I'm not sure as the documentation doesn't include all the fields for this type. Does anyone know whether this is possible and, if it is, where I can find this data?


Solution

  • The RunFailureSensorContext object has a DagsterRun member, which points to the run that failed.

        @run_failure_sensor(request_job=excel_to_csv)
        def excel_to_csv_failure(ctx: RunFailureSensorContext):
        
            ctx.log.error(f"{ctx.sensor_name} failed to process {ctx.failure_event.asset_key}")
            failed_run_config = ctx.dagster_run.run_config
            # do stuff with run_config