airflow - 在气流中以随机顺序运行任务
问题描述
目前我有一个任务列表,这些任务每天都需要在同一时间运行,但是它们都是相互独立的。我知道我可以将它们设置为按特定顺序运行,即t1 >> t2 >> t3
,但是我希望顺序是随机的,因此它们完成的顺序并不总是相同的。如何以随机顺序运行气流任务列表?
解决方案
你刚才说它们是相互独立的,你为什么不同时运行它们呢?
这可以通过简单地不使用任何移位运算符来实现,例如:
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
args = {
'owner': 'Airflow',
'start_date': days_ago(0)
}
dag = DAG(dag_id='example_random_task', default_args=args, max_active_runs=0, catchup=False)
first_operator = DummyOperator(task_id='{}_operator'.format("first"), dag=dag)
second_operator = DummyOperator(task_id='{}_operator'.format("second"), dag=dag)
third_operator = DummyOperator(task_id='{}_operator'.format("third"), dag=dag)
但是,如果您真的想要随机排列任务并使其在某种随机队列中可执行,您可以将所有任务添加到列表中,然后随机播放。然后迭代任务并使当前依赖于下一个,例如:
为此,请使用random.shuffle()
which shuffles list in-place:
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
import random
args = {
'owner': 'Airflow',
'start_date': days_ago(0)
}
dag = DAG(dag_id='example_random_task', default_args=args, max_active_runs=0, catchup=False)
first_operator = DummyOperator(task_id='{}_operator'.format("first"), dag=dag)
second_operator = DummyOperator(task_id='{}_operator'.format("second"), dag=dag)
third_operator = DummyOperator(task_id='{}_operator'.format("third"), dag=dag)
tasks_list = [first_operator, second_operator, third_operator]
random.shuffle(tasks_list)
i = 0
while i < len(tasks_list) - 1:
tasks_list[i] << tasks_list[i + 1]
i += 1
玩得开心!
推荐阅读
- google-cloud-platform - 从 GCP Console 创建短期服务帐号凭据
- linux - 如何添加包含在 shell 脚本中的标志?
- javascript - 如何检测页面在vue上刷新?
- java - java擦除如何决定返回类型的转换?
- asp.net - 试图确保在 IIS 中使用 httpOnly 属性设置“cookies”
- java - 读取作为正在运行的 jar 一部分的文本文件
- php - PHP 在通过 session_name() 设置 cookie 名称后创建新的会话名称
- c++ - WINDOWS如何判断一个内存地址是否对程序是静态的
- c# - 如果两个 nuget 包指向同一个 Dll 怎么办
- c - 是否可以使用“touch”命令管理间接 .h 依赖项?