kubernetes - pod_mutation_hook 函数不适用于使用 KubernetesExecutor 在 kubernetes 中运行的气流
问题描述
我正在尝试将在 kubernetes 中运行的气流部署CeleryExecutor
从KubernetesExecutor
. 在我的本地开发环境中一切都很顺利(在 minikube 上运行),但是我需要在生产中加载一个 sidecar 容器来运行允许我连接到我的 sql 数据库的代理。经过一番谷歌搜索后,似乎在某处的文件中定义pod_mutation_hook函数是应该如何完成此任务的。airflow_local_settings.py
$PYTHONPATH
首先,我尝试根据此示例在配置映射中定义它。例如
apiVersion: v1
kind: ConfigMap
metadata:
name: airflow-config
namespace: dev
data:
...
AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM: "airflow-logs"
AIRFLOW__KUBERNETES__AIRFLOW_LOCAL_SETTINGS_CONFIGMAP: "airflow-config"
...
airflow_local_settings.py: |
from airflow.contrib.kubernetes.pod import Pod
def pod_mutation_hook(pod: Pod):
extra_labels = {
"test-label": "True",
}
pod.labels.update(extra_labels)
我在文件中指定了这个 configmap airflow.cfg
,它被拾取并正常安装,所有其他 env 变量都正常工作,但pod_mutation_hook
似乎没有运行,因为 kubernetes 执行程序启动的结果 pod 中没有添加任何标签(请注意日志此处还指定了卷声明,并且可以正常工作)。
接下来,我尝试按照此处的评论中的建议在airflow_local_settings.py
图像中定义气流正在为工作启动的文件。我还从上面的配置图中删除了相关部分。这似乎也对为作业创建的结果 pod 没有影响,因为它也缺少指定的标签。$AIRFLOW_HOME/configs/airflow_local_settings.py
airflow-config
所以,我不确定此时如何进行,因为我不明白我应该如何指定airflow_local_settings.py
文件和pod_mutation_hook
函数,以便它们在运行之前实际改变 pod。任何帮助将不胜感激。谢谢你。
解决方案
摘要:
如果您希望 KubernetesExecutor 或 KubernetesPodOperator(具有不同的 Executor)启动的所有 Pod 上的 Sidecar,您至少应该为 Scheduler 放置airflow_local_settings.py
文件,因为它们的 POD 都是由 Scheduler 启动的。PYTHONPATH
KubernetesPodOperator
但是,如果您还想要在使用时启动的 POD 上的边车,KubernetesExecutor
则需要设置(如在airflow_local_settings_configmap
https://github.com/astronomer/airflow-chart/blob/f3dddefe43c92d594cfcfe9c5b001680f45a986/templates/configmap.yaml#L72完成)当您将 KubernetesExecutor 与 Kubernetes 一起使用时,任务 pod(与 KubernetesPodOperator 一起)将由 worker POD 启动。airflow.cfg
请注意我们如何将相同的配置映射传递给调度程序部署(https://github.com/astronomer/airflow-chart/blob/f3dddefe43c92d594cfcfe9c5b001680f45a986/templates/scheduler/scheduler-deployment.yaml#L125-L135)和airflow.cfg
自身,因为我们希望所有 POD 通过 pod_mutation_hook 进行变异。
详情:
“airflow.cfg”和“airflow_local_settings.py”文件需要存在于调度器上(你的调度器是在虚拟机上还是在 POD 上与这里无关)。我们还添加了有关在何处输出此文件的文档:https ://airflow.apache.org/docs/stable/concepts.html#where-to-put-airflow-local-settings-py
pod_mutation_hook
每当您使用KubernetesExecutor
或时都会使用now KubernetePodOperator
。KubernetesExecutor 或 KubernetesExecutor 启动的 POD 将使用此突变挂钩。
现在,回到配置图。当您正在使用KubernetesExecutor
并有一个使用 KuberneretPodOperator 的任务时,您需要在 KubernetesExecutor 启动的工作 pod 上同时存在airflow.cfg
和文件。airflow_local_settings.py
KubernetesExecutor 会为此任务启动一个 Worker Pod。
Scheduler Pod ---> Worker Pod (Pod_1 -- 由 KubernetesExecuetor 启动) --> (Pod_2 -- 由 Pod_1 使用 KubernetePodOperator 启动的任务)
现在,airflow.cfg 中的整个[kubernetes]部分(https://github.com/apache/airflow/blob/master/airflow/config_templates/default_airflow.cfg#L870-L1028)仅用于 KubernetesExecutor 并影响挂载的内容在KubernetesExecutor 启动的Worker Pod 上。
如果您不指定airflow_local_settings
configmap,airflow_local_settings 文件将不会挂载到 worker pod(上例中的 Pod_1),并且只会挂载 airflow.cfg 文件。所以现在对于 Pod_2(由 Pod_1 启动)——(当您将 KubernetesPodOperator 与 KubernetesExecutor 一起使用时的特殊情况),因为 Pod_1(工作 POD)没有airflow_local_settings.py
文件,即使调度程序有它,Pod_2 也不会发生变异,因为文件没有那里不存在。
认为它与airflow.cfg 相同——为什么要将airflow.cfg
文件同时挂载到Scheduler POD 和worker POD。同样,对于这种边缘情况,您需要airflow_local_settings.py
在两个地方都有文件。
https://github.com/apache/airflow/blob/ba2d6408e64f219e8f53a20a5a149e3d8109db31/airflow/kubernetes/worker_configuration.py#L279-L305 --> 此代码用于决定在 Worker Pod 上挂载的内容(REF_1)
https://github.com/apache/airflow/blob/ba2d6408e64f219e8f53a20a5a149e3d8109db31/airflow/executors/kubernetes_executor.py#L462-L481 --> 为 KubernetesExecutor 运行的每个任务创建的 Pod ( REF_2 ) -- 变异应用于此 POD因为这是由调度程序启动的并且它有airflow_local_settings.py
文件
https://github.com/apache/airflow/blob/ba2d6408e64f219e8f53a20a5a149e3d8109db31/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py#L383 --> 此代码用于在使用KubernetesPod Operator时创建新的POD(REF_3) -- 由于airflow_local_settings.py
未安装在REF_2中生成的 POD 上,因此未将突变应用于此 POD。
推荐阅读
- opencv - 在树莓派上使用 cmake 构建 opencv 时出错
- r - 如何根据给定字符的一次或多次出现来拆分字符串?
- python - “ModuleNotFoundError: No module named 'numpy'”,但我已经安装了 NumPy
- reactjs - 使用侧边栏组件反应参数路由
- javascript - 如何使 useEffect 对 props 的反应与 ComponentDidUpdate 一样快?
- ruby-on-rails - 将带有数组的哈希存储为rails中的值
- ios - iOS Swift - URLSessionDataTask 在刷新时没有得到更新的 JSON 数据
- java - 在 kotlin 中将 String 转换为 JsonObject 返回 null
- python - 有没有办法获取特定电子邮件 ID 的 ID?
- css - 样式输入类型编号