airflow - 气流集群策略未生效
问题描述
我正在尝试在 Airflow 1.9 中使用集群策略。我按照官方文档中的说明进行操作,但似乎没有生效。
在我的文件中$AIRFLOW_HOME/config/airflow_local_settings.py
,我按照文档的指示定义了该方法,并且它具有以下签名:
def policy(task_instance):
其他问题:
- 哪个 Airflow 组件实际上在运行策略代码(是调度程序)?
- 是否有推荐的方法来单元测试集群策略代码?如果没有,那么本地测试呢?
谁能帮我理解为什么这个集群策略没有生效?
我正在使用气流 1.9。
解决方案
因此,根据文档,您似乎将文件放在正确的位置:https ://github.com/apache/airflow/blob/master/docs/concepts.rst#where-to-put-airflow_local_settingspy
你的签名是正确的:https ://airflow.apache.org/docs/stable/concepts.html#mutate-tasks-after-dag-loaded
但是你还没有展示你做了什么以及它是如何“不起作用”的。
我相信def policy(task):
签名是在 DAG 解析之后在调度程序上运行的(正如文档似乎所说的那样),而def task_instance_mutation_hook(ti):
签名是由工作人员上的任务执行程序运行的。这可能就是您没有看到一些变化的原因。
EGtimeout
或者queue
是调度程序强制执行的内容,但这connection ID
是工作人员在执行期间需要知道的内容。
因此,如果您想要工作的是超时策略,它应该有,但如果您想要工作的是连接 ID 强制,那就不会了。
推荐阅读
- java - 如何将新元素添加到Java中二维数组中的下一个空索引?
- ruby - 如何使用ruby过滤掉vcenter中某个网段下的所有vm?
- oracle - 如何从 Oracle Fusion 运行报告 API 中获取最少记录
- angular - 想要以角度将数据从服务传递到组件
- node.js - 当我尝试连接到 mongodb 时出现这些错误
- python-3.x - 从 Python 中的列表中删除相似的字符串
- python - 列表名称的id在python中指定了什么?
- regex - regexp_extract 是否适用于多种模式?-Spark sql
- vue.js - 在Vue中的for循环中过滤列表
- javascript - dx-data-grid 保存按钮处于活动状态