airflow - 通过循环列表并传递参数在 Airflow 中创建任务
问题描述
编辑: 这会起作用,我定义了导致问题的 ex_func_airflow(var_1 = i)
我想通过在列表上循环来在气流中创建任务。
tabs = [1,2,3,4,5]
for i in tabs:
task = PythonOperator(
task_id = name,
provide_context=False,
op_args = [i],
python_callable=ex_func_airflow,
dag=dag)
task_0 >> task >> task_1
当它在气流中运行时,传递的参数始终是该列表中的最后一个元素。
所以我基本上是在运行:
ex_func_airflow(6)
五次而不是跑步
ex_func_airflow(1)
ex_func_airflow(2)
ex_func_airflow(3)
..ETC。
我如何才能为每个任务传递正确的参数?
解决方案
以下代码对我有用。
def print_context(ds, **kwargs):
print("hello")
def ex_func_airflow(i):
print(i)
dag = DAG(
dag_id="loop_dag",
schedule_interval=None,
start_date=datetime(2018, 12, 31),
)
task_0 = PythonOperator(
task_id='task_0',
provide_context=True,
python_callable=print_context,
dag=dag)
task_1 = PythonOperator(
task_id='task_1',
provide_context=True,
python_callable=print_context,
dag=dag)
tabs = [1, 2, 3, 4, 5]
for i in tabs:
task_id = f'task_tab_{i}'
task = PythonOperator(
task_id=task_id,
provide_context=False,
op_args=[i],
python_callable=ex_func_airflow,
dag=dag)
task_0 >> task >> task_1
推荐阅读
- parsing - Nutch/Elastic 搜索术语定义
- discord.py - 我是 discord.py 的菜鸟,我知道我的代码有什么问题
- c++ - 添加自定义 TensorFlow OP
- rust - 您如何将文本文件中的行读入 i32s 元组的向量?
- java - Jooq 使用生成的代码和 distinctOn 选择查询
- python - 使用密码学的 Python 解密会添加随机字符
- php - 如何一次使用多个 if 语句?
- python - 试图用 Python 找到 LCM(最小公倍数)
- json - 如何在 Asp.net Core 中使用 Json
- vue.js - Vuex:令牌不存储在本地存储中