首页 > 解决方案 > 使用参数手动触发气流 DAG,然后传递给 python 函数

问题描述

我想将参数传递给气流 DAG 并在 python 函数中使用它们。我可以将参数用于 bash 运算符,但找不到任何将它们用作 python 函数的参考。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago

#Define DAG
dag = DAG("test_backup", schedule_interval=None, start_date=days_ago(1))

#Parameter
owner="{{ dag_run.conf['owner'] }}"
table="{{ dag_run.conf['table'] }}"

run_this="echo "+owner+"."+table

def test_func(owner,table):
    print(owner+"."+table)

task1 = BashOperator(
    task_id='test_task1',
    bash_command=run_this,
    dag=dag,
    queue='cdp_node53',
) 

task2 = PythonOperator(
    task_id='test_task2',
   python_callable=test_func(owner,table),
    dag=dag,
    queue='cdp_node53',
) 

我想在触发 DAG 时将以下参数作为参数传递。“task1”对我来说很好。我需要使“task2”可行。请指导我更正上面的代码,以便我可以将参数传递给它。

{"owner":"test_owner","table":"test_table"}

标签: pythonairflow

解决方案


要将参数传递给PythonOperator您应该使用op_args(对于位置参数)或op_kwargs(对于关键字参数)。这两个参数也是模板字段,因此值也可以是 Jinja 表达式。

使用重构代码op_kwargs

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago


#Define DAG
dag = DAG("test_backup", schedule_interval=None, start_date=days_ago(1))

#Parameter
owner="{{ dag_run.conf['owner'] }}"
table="{{ dag_run.conf['table'] }}"

run_this="echo "+owner+"."+table

def test_func(owner,table):
    print(owner+"."+table)

task1 = BashOperator(
    task_id='test_task1',
    bash_command=run_this,
    dag=dag,
    queue='cdp_node53',
)

task2 = PythonOperator(
    task_id='test_task2',
    python_callable=test_func,
    op_kwargs={"owner": owner, "table": table},
    dag=dag,
    queue='cdp_node53',
)

这两个任务都将记录INFO - test_owner.test_table现在。


推荐阅读