首页 > 解决方案 > 在气流中以随机顺序运行任务

问题描述

目前我有一个任务列表,这些任务每天都需要在同一时间运行,但是它们都是相互独立的。我知道我可以将它们设置为按特定顺序运行,即t1 >> t2 >> t3,但是我希望顺序是随机的,因此它们完成的顺序并不总是相同的。如何以随机顺序运行气流任务列表?

标签: airflowairflow-scheduler

解决方案


你刚才说它们是相互独立的,你为什么不同时运行它们呢?

在此处输入图像描述

这可以通过简单地不使用任何移位运算符来实现,例如:

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

args = {
    'owner': 'Airflow',
    'start_date': days_ago(0)
}

dag = DAG(dag_id='example_random_task', default_args=args, max_active_runs=0, catchup=False)

first_operator = DummyOperator(task_id='{}_operator'.format("first"), dag=dag)
second_operator = DummyOperator(task_id='{}_operator'.format("second"), dag=dag)
third_operator = DummyOperator(task_id='{}_operator'.format("third"), dag=dag)

但是,如果您真的想要随机排列任务并使其在某种随机队列中可执行,您可以将所有任务添加到列表中,然后随机播放。然后迭代任务并使当前依赖于下一个,例如:
在此处输入图像描述

为此,请使用random.shuffle()which shuffles list in-place:

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
import random

args = {
    'owner': 'Airflow',
    'start_date': days_ago(0)
}

dag = DAG(dag_id='example_random_task', default_args=args, max_active_runs=0, catchup=False)

first_operator = DummyOperator(task_id='{}_operator'.format("first"), dag=dag)
second_operator = DummyOperator(task_id='{}_operator'.format("second"), dag=dag)
third_operator = DummyOperator(task_id='{}_operator'.format("third"), dag=dag)

tasks_list = [first_operator, second_operator, third_operator]
random.shuffle(tasks_list)

i = 0
while i < len(tasks_list) - 1:
    tasks_list[i] << tasks_list[i + 1]
    i += 1

玩得开心!


推荐阅读