airflow - 触发的 DAG 无法从 TriggerDagRunOperator 获取参数
问题描述
我尝试在 TriggerDagRunOperator 中使用一些参数触发另一个 dag,但在触发的 dag 中,dag_run 对象始终为 None。
在 TriggerDagRunOperator 中,消息参数被添加到 dag_run_obj 的负载中。
def conditionally_trigger(context, dag_run_obj):
if context['params']['condition_param']:
dag_run_obj.payload = {'message': context['params']['message']}
pp.pprint(dag_run_obj.payload)
return dag_run_obj
trigger = TriggerDagRunOperator(
task_id='test_trigger_dagrun',
trigger_dag_id="example_trigger_target_dag",
python_callable=conditionally_trigger,
params={'condition_param': True, 'message': 'Hello World'},
dag=dag,
)
我预计触发的 DAG 可以使用 kwargs['dag_run'].conf['message']) 获取它,但不幸的是它不起作用。
def run_this_func(ds, **kwargs):
print("Remotely received value of {} for key=message".
format(kwargs['dag_run'].conf['message']))
run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=run_this_func,
dag=dag,
)
kwargs 中的 dag_run 对象为 None
INFO - Executing <Task(PythonOperator): run_this> on 2019-01-18 16:10:18
INFO - Subtask: [2019-01-18 16:10:27,007] {models.py:1433} ERROR - 'NoneType' object has no attribute 'conf'
INFO - Subtask: Traceback (most recent call last):
INFO - Subtask: File "/Library/Python/2.7/site-packages/airflow/models.py", line 1390, in run
INFO - Subtask: result = task_copy.execute(context=context)
INFO - Subtask: File "/Library/Python/2.7/site-packages/airflow/operators/python_operator.py", line 80, in execute
INFO - Subtask: return_value = self.python_callable(*self.op_args, **self.op_kwargs)
INFO - Subtask: File "/Library/Python/2.7/site-packages/airflow/example_dags/example_trigger_target_dag.py", line 52, in run_this_func
INFO - Subtask: print("Remotely received value of {} for key=message".format(kwargs['dag_run'].conf['message']))
INFO - Subtask: AttributeError: 'NoneType' object has no attribute 'conf'
我还打印了 kwargs,确实 'dag_run' 对象是 None 。dags 是 Airflow 中的示例代码,所以我不确定发生了什么。有人知道原因吗?
INFO - Subtask: kwargs: {u'next_execution_date': None, u'dag_run': None, u'tomorrow_ds_nodash': u'20190119', u'run_id': None, u'dag': <DAG: example_trigger_target_dag>, u'prev_execution_date': None, ...
顺便说一句,如果我从 CLI 触发 DAG,它会起作用:
$ airflow trigger_dag 'example_trigger_target_dag' -r 'run_id' --conf '{"message":"test_cli"}'
日志:
INFO - Subtask: kwargs: {u'next_execution_date': None, u'dag_run': <DagRun example_trigger_target_dag @ 2019-01-18 ...
INFO - Subtask: Remotely received value of test_cli for key=message
解决方案
推荐阅读
- json - 无法从 ruby 中解析的 JSON Hash 中提取数据
- swift - Swift 中 HomeVC 注销按钮的导航 rootViewController 问题
- excel - 无循环评估索引中的动态查找范围
- angular - AngularFire基于用户身份验证检索文档ID
- sql - 掌握员工的母语和流利的语言
- flutter - Flutter:创建可重用的小部件
- user-interface - 在遵守 FreeDesktop 标准的同时,在哪里为用户特定的应用程序安装图标?
- c# - 在 ASP.NET MVC 中更新多个选择下拉列表(选择 2)
- asp.net-web-api - Azure 托管服务出错“发送请求时出错”
- json - 如何从 JSON API 调用项目并添加到 TabBar - Flutter