airflow - 基于外部文件的气流中的动态任务
问题描述
我正在从外部文件中读取元素列表并循环遍历元素以创建一系列任务。
例如,如果文件中有 2 个元素 - [A, B]。将有2个系列的任务:
A1 -> A2 ..
B1 -> B2 ...
此读取元素逻辑不是任何任务的一部分,而是在 DAG 本身中。因此,Scheduler 每天在读取 DAG 文件时调用它多次。我只想在 DAG 运行时调用它。
想知道是否已经有此类用例的模式?
解决方案
根据您的要求,如果您正在寻找的是避免多次读取文件,但您不介意从元数据数据库读取多次,那么您可以更改您的方法以Variables
用作迭代源动态创建任务。
一个基本示例可能是在 a 中执行文件读取PythonOperator
并设置Variables
您稍后将使用的迭代(相同的可调用):
示例文件.json:
{
"cities": [ "London", "Paris", "BA", "NY" ]
}
任务定义:
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.utils.task_group import TaskGroup
import json
def _read_file():
with open('dags/sample_file.json') as f:
data = json.load(f)
Variable.set(key='list_of_cities',
value=data['cities'], serialize_json=True)
print('Loading Variable from file...')
def _say_hello(city_name):
print('hello from ' + city_name)
with DAG('dynamic_tasks_from_var', schedule_interval='@once',
start_date=days_ago(2),
catchup=False) as dag:
read_file = PythonOperator(
task_id='read_file',
python_callable=_read_file
)
然后您可以从该变量中读取并创建动态任务。(设置 a 很重要default_var
)。TaskGroup
是可选的。
# Top-level code
updated_list = Variable.get('list_of_cities',
default_var=['default_city'],
deserialize_json=True)
print(f'Updated LIST: {updated_list}')
with TaskGroup('dynamic_tasks_group',
prefix_group_id=False,
) as dynamic_tasks_group:
for index, city in enumerate(updated_list):
say_hello = PythonOperator(
task_id=f'say_hello_from_{city}',
python_callable=_say_hello,
op_kwargs={'city_name': city}
)
# DAG level dependencies
read_file >> dynamic_tasks_group
在调度程序日志中,您只会发现:
INFO - Updated LIST: ['London', 'Paris', 'BA', 'NY']
达格图视图:
使用这种方法,调度程序连续读取的顶级代码是对方法的调用。如果您需要读取多个变量,请务必记住,建议将它们存储在一个 JSON 值中,以避免不断创建与元数据数据库的连接(本文中的示例)。Variable.get()
更新:
推荐阅读
- flutter - 无法找到或加载主类 Afzal\AppData\cmdline-tools\latest\bin\\
- android - 为什么ID被删除?
- sql-server - 递归 CTE:父子关系
- conda - 我想删除 conda 的环境名称
- angular - 如何使用 Angular 滚动指令更改 ng-class?
- javascript - 尝试使用嵌入式 YouTube 链接在自定义 HTML 中自动播放
- next.js - Storyblok 的实时内容更新在生产环境 Next.js 中不起作用
- php - 标头在本地工作,但在网上不工作
- operators - ocaml 中的一元减号和一元波浪号减号运算符有什么区别?
- sql - 活动日志表中 ID 列的最佳类型