首页 > 解决方案 > 触发的 DAG 无法从 TriggerDagRunOperator 获取参数

问题描述

我尝试在 TriggerDagRunOperator 中使用一些参数触发另一个 dag,但在触发的 dag 中,dag_run 对象始终为 None。

在 TriggerDagRunOperator 中,消息参数被添加到 dag_run_obj 的负载中。

def conditionally_trigger(context, dag_run_obj):
if context['params']['condition_param']:
    dag_run_obj.payload = {'message': context['params']['message']}
    pp.pprint(dag_run_obj.payload)
    return dag_run_obj


trigger = TriggerDagRunOperator(
    task_id='test_trigger_dagrun',
    trigger_dag_id="example_trigger_target_dag",
    python_callable=conditionally_trigger,
    params={'condition_param': True, 'message': 'Hello World'},
    dag=dag,
)

我预计触发的 DAG 可以使用 kwargs['dag_run'].conf['message']) 获取它,但不幸的是它不起作用。

def run_this_func(ds, **kwargs):
print("Remotely received value of {} for key=message".
      format(kwargs['dag_run'].conf['message']))


run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=run_this_func,
    dag=dag,
)

kwargs 中的 dag_run 对象为 None

INFO - Executing <Task(PythonOperator): run_this> on 2019-01-18 16:10:18 
INFO - Subtask: [2019-01-18 16:10:27,007] {models.py:1433} ERROR - 'NoneType' object has no attribute 'conf'
INFO - Subtask: Traceback (most recent call last):
INFO - Subtask:   File "/Library/Python/2.7/site-packages/airflow/models.py", line 1390, in run
INFO - Subtask:     result = task_copy.execute(context=context)
INFO - Subtask:   File "/Library/Python/2.7/site-packages/airflow/operators/python_operator.py", line 80, in execute
INFO - Subtask:     return_value = self.python_callable(*self.op_args, **self.op_kwargs)
INFO - Subtask:   File "/Library/Python/2.7/site-packages/airflow/example_dags/example_trigger_target_dag.py", line 52, in run_this_func
INFO - Subtask:     print("Remotely received value of {} for key=message".format(kwargs['dag_run'].conf['message']))
INFO - Subtask: AttributeError: 'NoneType' object has no attribute 'conf'

我还打印了 kwargs,确实 'dag_run' 对象是 None 。dags 是 Airflow 中的示例代码,所以我不确定发生了什么。有人知道原因吗?

INFO - Subtask: kwargs: {u'next_execution_date': None, u'dag_run': None, u'tomorrow_ds_nodash': u'20190119', u'run_id': None, u'dag': <DAG: example_trigger_target_dag>, u'prev_execution_date': None, ...

顺便说一句,如果我从 CLI 触发 DAG,它会起作用:

$ airflow trigger_dag 'example_trigger_target_dag' -r 'run_id' --conf '{"message":"test_cli"}'

日志:

INFO - Subtask: kwargs: {u'next_execution_date': None, u'dag_run': <DagRun example_trigger_target_dag @ 2019-01-18 ...

INFO - Subtask: Remotely received value of test_cli for key=message

标签: airflow

解决方案


推荐阅读