首页 > 解决方案 > 基于外部文件的气流中的动态任务

问题描述

我正在从外部文件中读取元素列表并循环遍历元素以创建一系列任务。

例如,如果文件中有 2 个元素 - [A, B]。将有2个系列的任务:

A1 -> A2 ..
B1 -> B2 ...

此读取元素逻辑不是任何任务的一部分,而是在 DAG 本身中。因此,Scheduler 每天在读取 DAG 文件时调用它多次。我只想在 DAG 运行时调用它。

想知道是否已经有此类用例的模式?

标签: airflowairflow-scheduler

解决方案


根据您的要求,如果您正在寻找的是避免多次读取文件,但您不介意从元数据数据库读取多次,那么您可以更改您的方法以Variables用作迭代源动态创建任务。

一个基本示例可能是在 a 中执行文件读取PythonOperator并设置Variables您稍后将使用的迭代(相同的可调用):

示例文件.json:

{
    "cities": [ "London", "Paris", "BA", "NY" ]
}

任务定义:

from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.utils.task_group import TaskGroup
import json

def _read_file():
    with open('dags/sample_file.json') as f:
        data = json.load(f)
        Variable.set(key='list_of_cities',
                     value=data['cities'], serialize_json=True)
        print('Loading Variable from file...')

def _say_hello(city_name):
    print('hello from ' + city_name)

with DAG('dynamic_tasks_from_var', schedule_interval='@once',
         start_date=days_ago(2),
         catchup=False) as dag:

    read_file = PythonOperator(
        task_id='read_file',
        python_callable=_read_file
    )

然后您可以从该变量中读取并创建动态任务。(设置 a 很重要default_var)。TaskGroup是可选的。

    # Top-level code
    updated_list = Variable.get('list_of_cities',
                                default_var=['default_city'],
                                deserialize_json=True)
    print(f'Updated LIST: {updated_list}')

    with TaskGroup('dynamic_tasks_group',
                   prefix_group_id=False,
                   ) as dynamic_tasks_group:

        for index, city in enumerate(updated_list):
            say_hello = PythonOperator(
                task_id=f'say_hello_from_{city}',
                python_callable=_say_hello,
                op_kwargs={'city_name': city}
            )

# DAG level dependencies
read_file >> dynamic_tasks_group

调度程序日志中,您只会发现:

INFO - Updated LIST: ['London', 'Paris', 'BA', 'NY']

达格图视图:

dag 图视图

使用这种方法,调度程序连续读取的顶级代码是对方法的调用。如果您需要读取多个变量,请务必记住,建议将它们存储在一个 JSON 值中,以避免不断创建与元数据数据库的连接(本文中的示例)。Variable.get()

更新:

  • 至于 11-2021,这种方法被认为是一种“快速而肮脏”的解决方案。
  • 它有效吗?是的,完全。是生产质量代码吗?不。
  • 它出什么问题了?每次调度程序解析文件时都会访问数据库,默认情况下每 30 秒访问一次,与您的 DAG 执行无关。有关 Airflow 最佳实践、顶级代码的完整详细信息。
  • 如何改进?考虑是否有任何关于动态 DAG 生成的推荐方法适用于您的需求。

推荐阅读