python - 使用参数手动触发气流 DAG,然后传递给 python 函数
问题描述
我想将参数传递给气流 DAG 并在 python 函数中使用它们。我可以将参数用于 bash 运算符,但找不到任何将它们用作 python 函数的参考。
from airflow import DAG
from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago
#Define DAG
dag = DAG("test_backup", schedule_interval=None, start_date=days_ago(1))
#Parameter
owner="{{ dag_run.conf['owner'] }}"
table="{{ dag_run.conf['table'] }}"
run_this="echo "+owner+"."+table
def test_func(owner,table):
print(owner+"."+table)
task1 = BashOperator(
task_id='test_task1',
bash_command=run_this,
dag=dag,
queue='cdp_node53',
)
task2 = PythonOperator(
task_id='test_task2',
python_callable=test_func(owner,table),
dag=dag,
queue='cdp_node53',
)
我想在触发 DAG 时将以下参数作为参数传递。“task1”对我来说很好。我需要使“task2”可行。请指导我更正上面的代码,以便我可以将参数传递给它。
{"owner":"test_owner","table":"test_table"}
解决方案
要将参数传递给PythonOperator
您应该使用op_args
(对于位置参数)或op_kwargs
(对于关键字参数)。这两个参数也是模板字段,因此值也可以是 Jinja 表达式。
使用重构代码op_kwargs
:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
#Define DAG
dag = DAG("test_backup", schedule_interval=None, start_date=days_ago(1))
#Parameter
owner="{{ dag_run.conf['owner'] }}"
table="{{ dag_run.conf['table'] }}"
run_this="echo "+owner+"."+table
def test_func(owner,table):
print(owner+"."+table)
task1 = BashOperator(
task_id='test_task1',
bash_command=run_this,
dag=dag,
queue='cdp_node53',
)
task2 = PythonOperator(
task_id='test_task2',
python_callable=test_func,
op_kwargs={"owner": owner, "table": table},
dag=dag,
queue='cdp_node53',
)
这两个任务都将记录INFO - test_owner.test_table
现在。
推荐阅读
- c - 在 c 中使用相同的参数打印整数和字符
- statistics - 如何确定要考虑获得与所有样本几乎相同的平均值的最小样本?
- python - 如何添加到现有字符串并将其添加到列表中
- google-bigquery - 使用 BigQuery DML 时如何更新所有列
- microsoft-graph-api - 使用 Microsoft Graph API 在 Azure AD 中添加用户
- javascript - 当任何承诺都实现时,是否有可能脱离 await Promise.all (Chrome 80)
- r - ggplot2:在图例中插入虚线
- javascript - Fabric js - 将画布宽度和高度设置为 100%
- android - Android 如何以编程方式更改进度可绘制颜色
- django - Django/SCSS:缺少静态文件清单-> 带有空路径的 staticfiles.json