首页 > 解决方案 > KubernetesExecutor 的气流:记录的 pid 与当前的 pid.airflow 不匹配。获取 AirflowException:任务收到 SIGTERM 信号

问题描述

我正在使用 Kubernetes:1.21.0 在 KubernetesExecutor 上运行 Airflow:2.1.4

获取 AirflowException:任务收到 SIGTERM 信号

堆栈跟踪:

[2021-10-11 06:22:52,543] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 17
[2021-10-11 06:22:52,543] {taskinstance.py:1236} **ERROR - Received SIGTERM. Terminating subprocesses.**
[2021-10-11 06:22:52,560] {taskinstance.py:1463} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1165, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1283, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1313, in _execute_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 150, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 161, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/usr/local/airflow/dags/a.py", line 12, in test
    time.sleep(5)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1238, in signal_handler
    raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal

当任务接收到heartbeat_callback(根据airflow配置中的job_heartbeat_sec),记录的pid和当前的pid不匹配,因为这部分代码被执行了。https://github.com/apache/airflow/blob/main/airflow/jobs/local_task_job.py#L201-L203

if ti.run_as_user or self.task_runner.run_as_user:
            recorded_pid = psutil.Process(ti.pid).ppid()
            same_process = recorded_pid == current_pid

我尝试通过更改源代码在此处打印 run_as_user 的值:

ti.run_as_user = 无 self.task_runner.run_as_user = 气流

我曾尝试更改 job_heartbeat_sec 值,但该任务始终失败。我正在运行调度程序,网络服务器作为气流用户。DAG 也由气流用户触发。我尝试将 DAG 定义中的 run_as_user 设置为 None 或气流,但同样的错误。

从 Docker 文件创建 Airflow 用户:

RUN chown -R airflow: /usr/local/bin/airflow
RUN chown -R airflow: /root
USER airflow
WORKDIR ${AIRFLOW_HOME}

气流配置

[scheduler]
job_heartbeat_sec = 30
clean_tis_without_dagrun_interval = 15.0
scheduler_heartbeat_sec = 10
num_runs = -1
processor_poll_interval = 1
min_file_process_interval = 0
dag_dir_list_interval = 300
print_stats_interval = 30
pool_metrics_interval = 5.0
scheduler_health_check_threshold = 295
orphaned_tasks_check_interval = 305.0
child_process_log_directory = /root/airflow/logs/scheduler
scheduler_zombie_task_threshold = 300
catchup_by_default = True
max_tis_per_query = 512
use_row_level_locking = True
max_dagruns_to_create_per_loop = 10
max_dagruns_per_loop_to_schedule = 20
schedule_after_task_execution = False
parsing_processes = 2
file_parsing_sort_mode = modified_time
use_job_schedule = True
allow_trigger_in_future = False
dependency_detector = airflow.serialization.serialized_objects.DependencyDetector
run_duration = -1
max_threads = 2
authenticate = False


[kubernetes]
pod_template_file = /usr/local/airflow/pod_templates/pod_template_file.yaml
worker_container_repository = ************************
worker_container_tag = ****************************
namespace = default
delete_worker_pods = False
delete_worker_pods_on_failure = False
worker_pods_creation_batch_size = 1
multi_namespace_mode = False
in_cluster = True
kube_client_request_args =
delete_option_kwargs =
enable_tcp_keepalive = False
tcp_keep_idle = 120
tcp_keep_intvl = 30
tcp_keep_cnt = 6
verify_ssl = True
worker_pods_pending_timeout = 300
worker_pods_pending_timeout_check_interval = 120
worker_pods_pending_timeout_batch_size = 100
airflow_configmap = airflow-configmap
airflow_local_settings_configmap = airflow-configmap

标签: airflowairflow-2.x

解决方案


错误修复应该在2.2.3中出现


推荐阅读