airflow - 带有配置/参数 json 的 Airflow DAG 并循环到该参数以生成运算符
问题描述
我有一个手动触发的 dag。它需要一个参数,如:
{"id_list":"3,5,1"}
在 DAG 中,我根据这个整数列表动态创建运算符:
for id in id_list:
task = create_task(id)
我需要id_list
根据id_list
. 由于不在模板字段中时无法直接引用该参数,如何初始化该列表?这就是我想在图形视图中看到它的方式,其中流程任务基于id_list
参数。
我已经看到了动态创建任务的示例,但它们并不是真正动态的,因为列表值是硬编码的。如果有意义的话,这些任务是根据硬编码值列表动态创建的。
解决方案
首先,创建固定数量的任务来执行。此示例使用 PythonOperator。在 中python_callable
,如果index
小于 then 的长度,param_list
则执行 else raiseAirflowSkipException
def execute(index, account_ids):
param_list = account_ids.split(',')
if index < len(param_list):
print(f"execute task index {index}")
else:
raise AirflowSkipException
def create_task(task_id, index):
return PythonOperator(task_id=task_id,
python_callable=execute,
op_kwargs={
"index": index,
"account_ids": "{{ dag_run.conf['account_ids'] }}"}
)
record_size_limit = 5
ACCOUNT_LIST = [None] * record_size_limit
for idx in range(record_size_limit):
task = create_task(f"task_{idx}", idx)
task
触发 DAG 并将其作为参数传递:
图表视图: