I'm trying to implement a prefect flow using varying parameters:
from prefect import Flow, Parameter
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock
a = Parameter('a', default=None, required=False)
b = Parameter('b', default=None, required=False)
schedule = Schedule(clocks=[
CronClock(' 0 18 * * 6', parameter_defaults={'a': 'a', 'b': 'b'}),
CronClock(' 0 12 * * 0', parameter_defaults={'a': 'a', 'b': 'b'})
])
flow = Flow(
name='test flow', schedule=schedule
)
flow.register()
but I get the following error:
Result check: OK
Traceback (most recent call last):
File "/home/psimakis/.config/JetBrains/PyCharm2020.2/scratches/scratch.py", line 18, in <module>
flow.register()
File "/home/psimakis/.local/share/virtualenvs/data-workflows-GfPV92cZ/lib/python3.6/site-packages/prefect/core/flow.py", line 1443, in register
no_url=no_url,
File "/home/psimakis/.local/share/virtualenvs/data-workflows-GfPV92cZ/lib/python3.6/site-packages/prefect/client/client.py", line 673, in register
version_group_id=version_group_id,
File "/home/psimakis/.local/share/virtualenvs/data-workflows-GfPV92cZ/lib/python3.6/site-packages/prefect/client/client.py", line 226, in graphql
raise ClientError(result["errors"])
prefect.utilities.exceptions.ClientError: [{'message': "Extra parameters were supplied: {'a', 'b'}", 'locations': [{'line': 2, 'column': 5}], 'path': ['create_flow_from_compressed_string'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'errors': [{'message': "Extra parameters were supplied: {'a', 'b'}", 'locations': [], 'path': ['create_flow_from_compressed_string']}]}}}]
Environment:
Have you any idea what causes this error?
Parameters are a special type of Task in Prefect. In order to use them they need to be added to the flow through methods like this:
from prefect import Flow, Parameter
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock
a = Parameter('a', default=None, required=False)
b = Parameter('b', default=None, required=False)
schedule = Schedule(clocks=[
CronClock(' 0 18 * * 6', parameter_defaults={'a': 'a', 'b': 'b'}),
CronClock(' 0 12 * * 0', parameter_defaults={'a': 'a', 'b': 'b'})
])
flow = Flow(
name='test flow', schedule=schedule
)
flow.add_task(a)
flow.add_task(b)