Currently, I am facing with dagster.core.errors.PartitionExecutionError
but error logs from Dagster seem not obvious to me.
dagster.core.errors.PartitionExecutionError: Error occurred during the evaluation of the `run_config_for_partition` function for partition set download_firebase_data_local_partition_set
File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/grpc/impl.py", line 292, in get_partition_config
return ExternalPartitionConfigData(name=partition.name, run_config=run_config)
File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/contextlib.py", line 137, in __exit__
self.gen.throw(typ, value, traceback)
File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/core/errors.py", line 192, in user_code_error_boundary
raise error_cls(
The above exception was caused by the following exception:
TypeError: daily_download_config() takes 1 positional argument but 2 were given
File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/core/errors.py", line 185, in user_code_error_boundary
yield
File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/grpc/impl.py", line 291, in get_partition_config
run_config = partition_set_def.run_config_for_partition(partition)
File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/core/definitions/partition.py", line 441, in run_config_for_partition
return copy.deepcopy(self._user_defined_run_config_fn_for_partition(partition))
File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/core/definitions/time_window_partitions.py", line 192, in <lambda>
run_config_for_partition_fn=lambda partition: fn(
My current setup is
@graph
def download():
"""
Download data from BigQuery then upload to S3
"""
extract_data_in_date()
@daily_partitioned_config(start_date=datetime(2021, 12, 1))
def daily_download_config(date: datetime):
return {
"resources": {
"date": date.strftime("%Y-%m-%d")
}
}
download_local_job = download.to_job(
name=f'{NAME}_local',
resource_defs={
**{
"date": make_values_resource(date=str),
"project_name": ResourceDefinition.hardcoded_resource("test-123")
},
**RESOURCES_LOCAL,
},
config=daily_download_config,
executor_def=in_process_executor
)
I am not sure where I am wrong, can you please help
@daily_paritioned_config
needs to be able to accept two arguments, one for the start of the time window and one for the end. daily_download_config
doesn't actually make use of this end date value, but it still needs to show up in the signature because Dagster will try to pass two arguments to this function regardless