首页 > 解决方案 > 无法使用不同的参数注册完美的流程

问题描述

我正在尝试使用不同的参数来实现完美的流程:

from prefect import Flow, Parameter
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock

a = Parameter('a', default=None, required=False)
b = Parameter('b', default=None, required=False)

schedule = Schedule(clocks=[
    CronClock(' 0 18  *  *  6', parameter_defaults={'a': 'a', 'b': 'b'}),
    CronClock(' 0 12  *  *  0', parameter_defaults={'a': 'a', 'b': 'b'})
])

flow = Flow(
    name='test flow', schedule=schedule
)

flow.register()

但我收到以下错误:

Result check: OK
Traceback (most recent call last):
  File "/home/psimakis/.config/JetBrains/PyCharm2020.2/scratches/scratch.py", line 18, in <module>
    flow.register()
  File "/home/psimakis/.local/share/virtualenvs/data-workflows-GfPV92cZ/lib/python3.6/site-packages/prefect/core/flow.py", line 1443, in register
    no_url=no_url,
  File "/home/psimakis/.local/share/virtualenvs/data-workflows-GfPV92cZ/lib/python3.6/site-packages/prefect/client/client.py", line 673, in register
    version_group_id=version_group_id,
  File "/home/psimakis/.local/share/virtualenvs/data-workflows-GfPV92cZ/lib/python3.6/site-packages/prefect/client/client.py", line 226, in graphql
    raise ClientError(result["errors"])
prefect.utilities.exceptions.ClientError: [{'message': "Extra parameters were supplied: {'a', 'b'}", 'locations': [{'line': 2, 'column': 5}], 'path': ['create_flow_from_compressed_string'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'errors': [{'message': "Extra parameters were supplied: {'a', 'b'}", 'locations': [], 'path': ['create_flow_from_compressed_string']}]}}}]

环境:

你知道是什么导致了这个错误吗?

标签: python-3.xprefect

解决方案


参数是 Prefect 中的一种特殊类型的任务。为了使用它们,需要将它们添加到流程中,如下所示:

from prefect import Flow, Parameter
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock

a = Parameter('a', default=None, required=False)
b = Parameter('b', default=None, required=False)

schedule = Schedule(clocks=[
    CronClock(' 0 18  *  *  6', parameter_defaults={'a': 'a', 'b': 'b'}),
    CronClock(' 0 12  *  *  0', parameter_defaults={'a': 'a', 'b': 'b'})
])

flow = Flow(
    name='test flow', schedule=schedule
)

flow.add_task(a)
flow.add_task(b)

推荐阅读