python - Airflow KubernetesPodOperator 在本地 MicroK8 上超时
问题描述
我正在尝试使用 KubernetesPodOperator 启动一个测试 Pod。作为图像,我使用了 Docker 中的 hello-world 示例,我将其推送到了我的 MicroK8s 安装的本地注册表。
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.kubernetes.pod import Port
from airflow.utils.dates import days_ago
from datetime import timedelta
ports = [Port('http', 80)]
default_args = {
'owner': 'user',
'start_date': days_ago(5),
'email': ['user@mail'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0
}
workflow = DAG(
'kubernetes_helloworld',
default_args=default_args,
description='Our first DAG',
schedule_interval=None,
)
op = DummyOperator(task_id='dummy', dag=workflow)
t1 = KubernetesPodOperator(
dag=workflow,
namespace='default',
image='localhost:32000/hello-world:registry',
name='pod2',
task_id='pod2',
is_delete_operator_pod=True,
hostnetwork=False,
get_logs=True,
do_xcom_push=False,
in_cluster=False,
ports=ports,
)
op >> t1
当我触发 DAG 时,它会继续运行并无限期地重新尝试启动 pod。这是我在 Airflow 中得到的日志输出:
Reading local file: /home/user/airflow/logs/kubernetes_helloworld/pod2/2021-03-17T16:25:11.142695+00:00/4.log
[2021-03-17 16:30:00,315] {taskinstance.py:851} INFO - Dependencies all met for <TaskInstance: kubernetes_helloworld.pod2 2021-03-17T16:25:11.142695+00:00 [queued]>
[2021-03-17 16:30:00,319] {taskinstance.py:851} INFO - Dependencies all met for <TaskInstance: kubernetes_helloworld.pod2 2021-03-17T16:25:11.142695+00:00 [queued]>
[2021-03-17 16:30:00,319] {taskinstance.py:1042} INFO -
--------------------------------------------------------------------------------
[2021-03-17 16:30:00,320] {taskinstance.py:1043} INFO - Starting attempt 4 of 1
[2021-03-17 16:30:00,320] {taskinstance.py:1044} INFO -
--------------------------------------------------------------------------------
[2021-03-17 16:30:00,330] {taskinstance.py:1063} INFO - Executing <Task(KubernetesPodOperator): pod2> on 2021-03-17T16:25:11.142695+00:00
[2021-03-17 16:30:00,332] {standard_task_runner.py:52} INFO - Started process 9021 to run task
[2021-03-17 16:30:00,335] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'kubernetes_helloworld', 'pod2', '2021-03-17T16:25:11.142695+00:00', '--job-id', '57', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/kubernetes_helloworld.py', '--cfg-path', '/tmp/tmp5ss4g6q4', '--error-file', '/tmp/tmp9t3l8emt']
[2021-03-17 16:30:00,336] {standard_task_runner.py:77} INFO - Job 57: Subtask pod2
[2021-03-17 16:30:00,357] {logging_mixin.py:104} INFO - Running <TaskInstance: kubernetes_helloworld.pod2 2021-03-17T16:25:11.142695+00:00 [running]> on host 05nclorenzvm01.internal.cloudapp.net
[2021-03-17 16:30:00,369] {taskinstance.py:1255} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=user
AIRFLOW_CTX_DAG_OWNER=user
AIRFLOW_CTX_DAG_ID=kubernetes_helloworld
AIRFLOW_CTX_TASK_ID=pod2
AIRFLOW_CTX_EXECUTION_DATE=2021-03-17T16:25:11.142695+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-03-17T16:25:11.142695+00:00
[2021-03-17 16:32:09,805] {connectionpool.py:751} WARNING - Retrying (Retry(total=2, connect=None, read=None, redirect=None, status=None)) after connection broken by 'NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x7f812fc23eb0>: Failed to establish a new connection: [Errno 110] Connection timed out')': /api/v1/namespaces/default/pods?labelSelector=dag_id%3Dkubernetes_helloworld%2Cexecution_date%3D2021-03-17T162511.1426950000-e549b02ea%2Ctask_id%3Dpod2
当我在没有 Airflow 的情况下在 kubernetes 中启动 pod 时,它运行良好。我究竟做错了什么?
我尝试了以下事情:
- 使用 sleep 命令防止容器退出
- 尝试不同的图像,例如 pyspark
- 重新安装 Airflow 和 MicroK8s
Airflow v2.0.1 MicroK8s v1.3.7 Python 3.8 Ubuntu 18.04 LTS
解决方案
不幸的是,我还没有弄清楚 microK8s 的问题。
但是我可以在 Airflow 中使用 KubernetesPodOperator 和 minikube。以下代码能够毫无问题地运行:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow import configuration as conf
from airflow.utils.dates import days_ago
default_args = {
'owner': 'user',
'start_date': days_ago(5),
'email': ['user@airflow.de'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0
}
namespace = conf.get('kubernetes', 'NAMESPACE')
if namespace =='default':
config_file = '/home/user/.kube/config'
in_cluster=False
else:
in_cluster=True
config_file=None
dag = DAG('example_kubernetes_pod',
schedule_interval='@once',
default_args=default_args)
with dag:
k = KubernetesPodOperator(
namespace=namespace,
image="hello-world",
labels={"foo": "bar"},
name="airflow-test-pod",
task_id="task-one",
in_cluster=in_cluster, # if set to true, will look in the cluster, if false, looks for file
cluster_context='minikube', # is ignored when in_cluster is set to True
config_file=config_file,
is_delete_operator_pod=True,
get_logs=True)
推荐阅读
- python-3.x - 在 MacOSX Mavericks 上使用 PyQt5、Python3 和 Anaconda3 出现问题
- angular - 如何在 Angular 4 中包含 canvas-datagrid
- android - 检查小米是否应该振动来电
- react-native - react-native-router-flux:没有为 key undefined 定义路由。必须是 'home'、'activeJobs'、'favorites'、'inbox'、'profile' 之一
- javascript - 为什么动态添加的视图输入字段无法在主视图上附加 jQuery 事件
- vb.net - 根据 DB 结果 VB.net 更改文本框中的数据
- c# - 优化以下正则表达式
- spring - Spring-boot:日期查询参数字符串格式
- sql - MS Access / SQL Server JOIN 以返回左表中的所有值,如果在右表中缺失则返回零
- backbone.js - BackboneJS,保存方法被触发两次