Search code examples
dagster

What is proper Partition configs for Dagster job?


Currently, I am facing with dagster.core.errors.PartitionExecutionError but error logs from Dagster seem not obvious to me.

dagster.core.errors.PartitionExecutionError: Error occurred during the evaluation of the `run_config_for_partition` function for partition set download_firebase_data_local_partition_set 
  File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/grpc/impl.py", line 292, in get_partition_config
    return ExternalPartitionConfigData(name=partition.name, run_config=run_config)
  File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/contextlib.py", line 137, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/core/errors.py", line 192, in user_code_error_boundary
    raise error_cls(
The above exception was caused by the following exception:

TypeError: daily_download_config() takes 1 positional argument but 2 were given 
  File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/core/errors.py", line 185, in user_code_error_boundary
    yield
  File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/grpc/impl.py", line 291, in get_partition_config
    run_config = partition_set_def.run_config_for_partition(partition)
  File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/core/definitions/partition.py", line 441, in run_config_for_partition
    return copy.deepcopy(self._user_defined_run_config_fn_for_partition(partition))
  File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/core/definitions/time_window_partitions.py", line 192, in <lambda>
    run_config_for_partition_fn=lambda partition: fn(

My current setup is

@graph
def download():
    """
    Download data from BigQuery then upload to S3
    """
    extract_data_in_date()


@daily_partitioned_config(start_date=datetime(2021, 12, 1))
def daily_download_config(date: datetime):
    return {
        "resources": {
            "date": date.strftime("%Y-%m-%d")
        }
    }

download_local_job = download.to_job(
    name=f'{NAME}_local',
    resource_defs={
        **{
            "date": make_values_resource(date=str),
            "project_name": ResourceDefinition.hardcoded_resource("test-123")
        },
        **RESOURCES_LOCAL,
    },
    config=daily_download_config,
    executor_def=in_process_executor
)

I am not sure where I am wrong, can you please help


Solution

  • @daily_paritioned_config needs to be able to accept two arguments, one for the start of the time window and one for the end. daily_download_config doesn't actually make use of this end date value, but it still needs to show up in the signature because Dagster will try to pass two arguments to this function regardless