python - 基于 dag_run conf 值的循环中的气流任务
问题描述
我正在尝试根据 dag_run conf 输入创建多个气流任务。conf 将有一个值数组,每个值都需要生成一个任务。该任务又需要将值传递给它的可调用函数。像这样的东西:
#create this task in a loop
task = PythonOperator(task_id="fetch_data", python_callable=fetch_data(value from array), retries=10)
Conf 的值如下:
{"fruits":["apple","kiwi","orange"]}
我认为这可以通过以下方式访问:
kwargs['dag_run'].conf('fruits')
如何在运算符之外访问此值,然后在循环中创建运算符?
解决方案
您可以将 PythonOperator 实例化包装在使用值列表的 for 循环中。
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
dag = DAG(
dag_id='fruit_name_printer',
start_date=datetime(2021, 1, 1),
schedule_interval='@once'
)
input = [
'apple',
'orange',
'banana'
]
def call_func(fruit_name):
print(fruit_name)
with dag:
for fruit in input:
printer = PythonOperator(
task_id=f'print_{fruit}',
python_callable=call_func,
op_kwargs={
'fruit_name': fruit
}
)
推荐阅读
- react-native - 是否可以在 expo react native 的后台运行 setInterval?
- mysql - MySQL远程连接被拒绝
- java - 使用多个表获取记录
- python - 遍历正则表达式中字符串匹配的每个组名
- react-native - 如何在 Jest 中运行 expo-sqlite 测试?
- rust - 保持全局 hashmap 值与 struct hashmap 值同步
- java - 对于流式日志,JNA 回调性能太慢。如何改进?
- python - Python 中的 3d 函数和矩阵——我应该使用 NumPy 吗?
- java - 泰语字符渲染问题
- data-structures - 这个循环如何运行 nlogn 次?(对于 (j = 2; j <= n; j = j * 2) )