首页 > 解决方案 > Cloud Composer 将自定义插件导入到所有现有的 dag

问题描述

我正在使用 Cloud Composer 来安排多个 DAG。这些 DAG 是使用此方法动态构建的,并且它们使用自定义插件。

我想知道在添加/修改涉及所有 DAG 的插件时如何进行(假设它为每个 DAG 添加了一个新任务)?

这样做时我们是否需要暂停所有正在运行的 DAG?

到目前为止,我在添加/修改插件时所做的是:

我这样做是为了强制 DAG 暂停,一旦更新完成,DAG 就会恢复,但会使用新插件和新任务(或者如果它不在这个 dagrun 中,那么它就是下一个)。没用吗?

谢谢,如果你能帮忙。

标签: airflowgoogle-cloud-composer

解决方案


正如架构图中所解释的,您在其中查看 DAG 和插件代码的Airflow 网络服务器在 Google 管理的租户项目中运行,而实际运行 DAG 和插件代码的Airflow 工作人员则直接在您的项目中。

当 DAG/Plugin 放置在Composer 存储桶中时,Airflow 网络服务器(属于租户项目)验证代码并更新 Airflow 数据库中的任何新调度更改。

同时,Airflow 调度程序(在您的项目中)要求 Airflow 数据库运行下一个 DAG,并通知 Airflow 工作人员执行计划的工作。Airflow 工作人员(在您的项目中)然后从 Composer 存储桶中获取 DAG/Plugin 代码并编译它们以运行该特定任务。

因此, Airflow 网络服务器和 Airflow 工作人员在不同时间分别读取对 DAG/插件代码所做的任何更新。

  • 如果您在 Airflow 网络服务器中看不到您的新代码,那么当工作人员在新任务运行时获取新代码时,仍应将其拾取。

  • 因此,您不必为工作人员重新启动 Composer 以获取更改。

  • 您不能强制工作人员在任务执行期间抓取并重新编译新代码。

如果没有更新,有两种方法可以刷新 Airflow Webserver 以查看插件代码更改:

  1. 通过Console中的“AIRFLOW CONFIGURATIONS OVERRIDE”选项卡将reload_on_plugin_change属性设置为。True[webserver]

  2. 或者,您可以通过 '<a href="https://cloud.google.com/composer/docs/how-to/using/installing-python-dependencies#install-package 专门添加/删除/更新 PYPI 包" rel="nofollow noreferrer">PYPI PACKAGES 的控制台选项卡。非 PYPI 包更改不会触发 Web 服务器重启。请注意,这还将启动整个 Composer 环境重新启动,这可能需要大约 20 分钟。


推荐阅读