airflow - Cloud Composer 将自定义插件导入到所有现有的 dag
问题描述
我正在使用 Cloud Composer 来安排多个 DAG。这些 DAG 是使用此方法动态构建的,并且它们使用自定义插件。
我想知道在添加/修改涉及所有 DAG 的插件时如何进行(假设它为每个 DAG 添加了一个新任务)?
这样做时我们是否需要暂停所有正在运行的 DAG?
到目前为止,我在添加/修改插件时所做的是:
- 将插件上传到
plugins
Composer 集群的存储桶中(使用 gcloud composer 命令) - 在 Airflow 配置中进行虚拟更新 -> 向 airflow.cfg 添加一个虚拟值(使用 gcloud composer 命令)
我这样做是为了强制 DAG 暂停,一旦更新完成,DAG 就会恢复,但会使用新插件和新任务(或者如果它不在这个 dagrun 中,那么它就是下一个)。没用吗?
谢谢,如果你能帮忙。
解决方案
正如架构图中所解释的,您在其中查看 DAG 和插件代码的Airflow 网络服务器在 Google 管理的租户项目中运行,而实际运行 DAG 和插件代码的Airflow 工作人员则直接在您的项目中。
当 DAG/Plugin 放置在Composer 存储桶中时,Airflow 网络服务器(属于租户项目)验证代码并更新 Airflow 数据库中的任何新调度更改。
同时,Airflow 调度程序(在您的项目中)要求 Airflow 数据库运行下一个 DAG,并通知 Airflow 工作人员执行计划的工作。Airflow 工作人员(在您的项目中)然后从 Composer 存储桶中获取 DAG/Plugin 代码并编译它们以运行该特定任务。
因此, Airflow 网络服务器和 Airflow 工作人员在不同时间分别读取对 DAG/插件代码所做的任何更新。
如果您在 Airflow 网络服务器中看不到您的新代码,那么当工作人员在新任务运行时获取新代码时,仍应将其拾取。
因此,您不必为工作人员重新启动 Composer 以获取更改。
您不能强制工作人员在任务执行期间抓取并重新编译新代码。
如果没有更新,有两种方法可以刷新 Airflow Webserver 以查看插件代码更改:
通过Console中的“AIRFLOW CONFIGURATIONS OVERRIDE”选项卡将
reload_on_plugin_change
属性设置为。True
[webserver]
或者,您可以通过 '<a href="https://cloud.google.com/composer/docs/how-to/using/installing-python-dependencies#install-package 专门添加/删除/更新 PYPI 包" rel="nofollow noreferrer">PYPI PACKAGES 的控制台选项卡。非 PYPI 包更改不会触发 Web 服务器重启。请注意,这还将启动整个 Composer 环境重新启动,这可能需要大约 20 分钟。
推荐阅读
- r - 从 R 中的 2 列合并每个单元格的多个条目
- c# - MassTransit - PrefetchCount 和单个消费者的多个渠道的解释
- swift - Swift 将 json 响应传递给 PKPass 对象
- php - Wordpress 头像大小和类别的数组
- javascript - UTF-16 十六进制解码 NodeJS
- java - Hibernate: session.byNaturalId().using().load() 返回带有空标识符的实例
- node.js - 为什么从 s3 存储桶下载图像时使用Sharp调整图像大小不起作用
- python - 如何修复 Tkinter 标签不出现?
- python - 从 Kivy 的另一个屏幕更改小部件
- android - 在 NavDirections 中找不到 safeargs 参数