首页 > 解决方案 > 如何正确更新作业状态

问题描述

据我所知,当大多数人想知道一个KubernetesSpark甚至)Job是否完成时,他们会在某处启动某种循环,以定期检查是否Job完成了相应的API.

现在,我正在使用( ) 运算符(在下面)Kubernetes在后台执行此操作:disown&bashPython

import subprocess

cmd = f'''
kubectl wait \\
    --for=condition=complete \\
    --timeout=-1s \\
    job/job_name \\
    > logs/kube_wait_log.txt \\
    &
'''

kube_listen = subprocess.run(
    cmd,
    shell = True,
    stdout = subprocess.PIPE
)

所以......我实际上有两个(相关的)问题:

  1. shell除了操作员之外,还有没有更好的方法在后台执行此&操作?
  2. 我认为最好的选项实际上是cURL从内部使用Job来更新Local Server APIKubernetes.
    • 但是,我不知道如何cURLJob. 是否可以?
    • 我想您将不得不在某个地方公开端口,但是在哪里?它真的支持吗?你能创建一个Kubernetes Service来管理端口和连接吗?

标签: kuberneteskubernetes-jobs

解决方案


如果您不想阻止运行到完成的进程,则可以创建一个subprocess.Popen实例。一旦你有了这个,你可以poll()看看它是否完成了。(如果可能的话,你应该非常非常努力地避免使用shell=True。)所以它的一种变体可能看起来像(未经测试):

with open('logs/kube_wait_log.txt', 'w') as f:
  with subprocess.Popen(['kubectl', 'wait',
                         '--for=condition=complete',
                         '--timeout=-1s',
                         'job/job_name'],
                         stdin=subprocess.DEVNULL,
                         stdout=f,
                         stderr=subprocess.STDOUT) as p:
    while True:
      if p.poll():
        job_is_complete()
        break
      time.sleep(1)

kubectl不过,比使用官方的 Kubernetes Python 客户端库更好。与其使用“等待”操作,不如观察相关作业对象并查看其状态是否更改为“已完成”。这可能看起来大致像(未经测试):

from kubernetes import client, watch
jobsv1 = client.BatchV1Api()
w = watch.watch()
for event in w.stream(jobsv1.read_namespaced_job, 'job_name', 'default'):
  job = event['object']
  if job.status.completion_time is not None:
    job_is_complete()
    break

Job 的 Pod 不需要通过 Kubernetes 服务器更新自己的状态。它只需要在完成后以成功的状态码 (0) 退出,这将反映在 Job 的状态字段中。


推荐阅读