首页 > 解决方案 > pod_mutation_hook 函数不适用于使用 KubernetesExecutor 在 kubernetes 中运行的气流

问题描述

我正在尝试将在 kubernetes 中运行的气流部署CeleryExecutorKubernetesExecutor. 在我的本地开发环境中一切都很顺利(在 minikube 上运行),但是我需要在生产中加载一个 sidecar 容器来运行允许我连接到我的 sql 数据库的代理。经过一番谷歌搜索后,似乎在某处的文件中定义pod_mutation_hook函数是应该如何完成此任务的。airflow_local_settings.py$PYTHONPATH

首先,我尝试根据此示例在配置映射中定义它。例如

apiVersion: v1
kind: ConfigMap
metadata:
  name: airflow-config
  namespace: dev
data:
  ...

  AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM: "airflow-logs"

  AIRFLOW__KUBERNETES__AIRFLOW_LOCAL_SETTINGS_CONFIGMAP: "airflow-config"
  ...

  airflow_local_settings.py: |
    from airflow.contrib.kubernetes.pod import Pod

    def pod_mutation_hook(pod: Pod):
        extra_labels = {
            "test-label": "True",
        }
        pod.labels.update(extra_labels)

我在文件中指定了这个 configmap airflow.cfg,它被拾取并正常安装,所有其他 env 变量都正常工作,但pod_mutation_hook似乎没有运行,因为 kubernetes 执行程序启动的结果 pod 中没有添加任何标签(请注意日志此处还指定了卷声明,并且可以正常工作)。

接下来,我尝试按照此处的评论中的建议在airflow_local_settings.py图像中定义气流正在为工作启动的文件。我还从上面的配置图中删除了相关部分。这似乎也对为作业创建的结果 pod 没有影响,因为它也缺少指定的标签。$AIRFLOW_HOME/configs/airflow_local_settings.pyairflow-config

所以,我不确定此时如何进行,因为我不明白我应该如何指定airflow_local_settings.py文件和pod_mutation_hook函数,以便它们在运行之前实际改变 pod。任何帮助将不胜感激。谢谢你。

标签: kubernetesairflowminikubekubernetesexecutor

解决方案


摘要

如果您希望 KubernetesExecutor 或 KubernetesPodOperator(具有不同的 Executor)启动的所有 Pod 上的 Sidecar,您至少应该为 Scheduler 放置airflow_local_settings.py文件,因为它们的 POD 都是由 Scheduler 启动的。PYTHONPATH

KubernetesPodOperator但是,如果您还想要在使用时启动的 POD 上的边车,KubernetesExecutor则需要设置(如在airflow_local_settings_configmaphttps://github.com/astronomer/airflow-chart/blob/f3dddefe43c92d594cfcfe9c5b001680f45a986/templates/configmap.yaml#L72完成)当您将 KubernetesExecutor 与 Kubernetes 一起使用时,任务 pod(与 KubernetesPodOperator 一起)将由 worker POD 启动。airflow.cfg

请注意我们如何将相同的配置映射传递给调度程序部署(https://github.com/astronomer/airflow-chart/blob/f3dddefe43c92d594cfcfe9c5b001680f45a986/templates/scheduler/scheduler-deployment.yaml#L125-L135)和airflow.cfg自身,因为我们希望所有 POD 通过 pod_mutation_hook 进行变异。

详情

“airflow.cfg”和“airflow_local_settings.py”文件需要存在于调度器上(你的调度器是在虚拟机上还是在 POD 上与这里无关)。我们还添加了有关在何处输出此文件的文档:https ://airflow.apache.org/docs/stable/concepts.html#where-to-put-airflow-local-settings-py

pod_mutation_hook每当您使用KubernetesExecutor或时都会使用now KubernetePodOperator。KubernetesExecutor 或 KubernetesExecutor 启动的 POD 将使用此突变挂钩。

现在,回到配置图。当您正在使用KubernetesExecutor并有一个使用 KuberneretPodOperator 的任务时,您需要在 KubernetesExecutor 启动的工作 pod 上同时存在airflow.cfg和文件。airflow_local_settings.py

KubernetesExecutor 会为此任务启动一个 Worker Pod。

Scheduler Pod ---> Worker Pod (Pod_1 -- 由 KubernetesExecuetor 启动) --> (Pod_2 -- 由 Pod_1 使用 KubernetePodOperator 启动的任务)

现在,airflow.cfg 中的整个[kubernetes]部分(https://github.com/apache/airflow/blob/master/airflow/config_templates/default_airflow.cfg#L870-L1028)仅用于 KubernetesExecutor 并影响挂载的内容在KubernetesExecutor 启动的Worker Pod 上。

如果您不指定airflow_local_settingsconfigmap,airflow_local_settings 文件将不会挂载到 worker pod(上例中的 Pod_1),并且只会挂载 airflow.cfg 文件。所以现在对于 Pod_2(由 Pod_1 启动)——(当您将 KubernetesPodOperator 与 KubernetesExecutor 一起使用时的特殊情况),因为 Pod_1(工作 POD)没有airflow_local_settings.py文件,即使调度程序有它,Pod_2 也不会发生变异,因为文件没有那里不存在。

认为它与airflow.cfg 相同——为什么要将airflow.cfg文件同时挂载到Scheduler POD 和worker POD。同样,对于这种边缘情况,您需要airflow_local_settings.py在两个地方都有文件。

https://github.com/apache/airflow/blob/ba2d6408e64f219e8f53a20a5a149e3d8109db31/airflow/kubernetes/worker_configuration.py#L279-L305 --> 此代码用于决定在 Worker Pod 上挂载的内容(REF_1

https://github.com/apache/airflow/blob/ba2d6408e64f219e8f53a20a5a149e3d8109db31/airflow/executors/kubernetes_executor.py#L462-L481 --> 为 KubernetesExecutor 运行的每个任务创建的 Pod ( REF_2 ) -- 变异应用于此 POD因为这是由调度程序启动的并且它有airflow_local_settings.py文件

https://github.com/apache/airflow/blob/ba2d6408e64f219e8f53a20a5a149e3d8109db31/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py#L383 --> 此代码用于在使用KubernetesPod Operator时创建新的POD(REF_3) -- 由于airflow_local_settings.py未安装在REF_2中生成的 POD 上,因此未将突变应用于此 POD。


推荐阅读