首页 > 解决方案 > 气流集群策略未生效

问题描述

我正在尝试在 Airflow 1.9 中使用集群策略。我按照官方文档中的说明进行操作,但似乎没有生效。

在我的文件中$AIRFLOW_HOME/config/airflow_local_settings.py,我按照文档的指示定义了该方法,并且它具有以下签名:

def policy(task_instance):

其他问题:

谁能帮我理解为什么这个集群策略没有生效?

我正在使用气流 1.9。

标签: airflowairflow-scheduler

解决方案


因此,根据文档,您似乎将文件放在正确的位置:https ://github.com/apache/airflow/blob/master/docs/concepts.rst#where-to-put-airflow_local_settingspy

你的签名是正确的:https ://airflow.apache.org/docs/stable/concepts.html#mutate-tasks-after-dag-loaded

但是你还没有展示你做了什么以及它是如何“不起作用”的。

我相信def policy(task):签名是在 DAG 解析之后在调度程序上运行的(正如文档似乎所说的那样),而def task_instance_mutation_hook(ti):签名是由工作人员上的任务执行程序运行的。这可能就是您没有看到一些变化的原因。

EGtimeout或者queue是调度程序强制执行的内容,但这connection ID是工作人员在执行期间需要知道的内容。

因此,如果您想要工作的是超时策略,它应该有,但如果您想要工作的是连接 ID 强制,那就不会了。


推荐阅读