airflow - 气流调度程序内存不足问题
问题描述
我们正在试验 Apache Airflow(版本 1.10rc2,使用 python 2.7)并将其部署到 kubernetes、webserver 和调度程序到不同的 pod,数据库也使用云 sql,但我们一直面临调度程序内存不足的问题荚。
在 OOM 的那一刻,我们只运行了 4 个示例 Dag(大约 20 个任务)。pod 的内存为 1Gib。我在其他帖子中看到,一个任务在运行时可能会消耗大约 50Mib 的内存,并且所有任务操作都在内存中,没有任何内容刷新到磁盘,所以这已经是 1Gb。
有没有什么经验法则可以用来计算基于并行任务的调度程序需要多少内存?
除了降低并行度之外,是否有任何调整可以减少调度程序本身的内存使用?
我认为我们的用例不需要 Dask 或 Celery 来水平扩展 Airflow,并为工人配备更多机器。
关于配置的更多细节:
executor = Localexecutor
parallelism = 10
dag_concurrency = 5
max_active_runs_per_dag = 2
workers = 1
worker_concurrency = 16
min_file_process_interval = 1
min_file_parsing_loop_time = 5
dag_dir_list_interval = 30
当时运行的 dag 是 example_bash_operator、example_branch_operator、example_python_operator 和我们开发的一个 quickDag。
在某些情况下,它们都只是简单的任务/操作符,如 DummyOperators、BranchOperatos、BashOperators,但只做 echo 或 sleep,而 PythonOperators 也只做睡眠。总共大约有 40 个任务,但并非所有任务都并行运行,因为其中一些是下游、依赖等,我们的并行度设置为 10,如上所述只有一个工作人员,并且dag_concurrency
设置为到 5。
我在气流日志中看不到任何异常,在任务日志中也没有。
只运行这些 dags 之一,气流似乎正在相应地工作。
我可以在调度程序 pod 中看到很多调度程序进程,每个进程使用 0.2% 或更多的内存:
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
461384 airflow 20 0 836700 127212 23908 S 36.5 0.4 0:01.19 /usr/bin/python /usr/bin/airflow scheduler 461397 airflow 20 0 356168 86320 5044 R 14.0 0.3 0:00.42 /usr/bin/python /usr/bin/airflow scheduler 44 airflow 20 0 335920 71700 10600 S 28.9 0.2 403:32.05 /usr/bin/python /usr/bin/airflow scheduler 56 airflow 20 0 330548 59164 3524 S 0.0 0.2 0:00.02
这是使用 0.3% 内存运行的任务之一:
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
462042 airflow 20 0 282632 91120 10544 S 1.7 0.3 0:02.66 /usr/bin/python /usr/bin/airflow run example_bash_operator runme_1 2018-08-29T07:39:48.193735+00:00 --local -sd /usr/lib/python2.7/site-packages/apache_airflow-1.10.0-py2.7.egg/airflow/example_dags/example_bash_operator.py
解决方案
实际上并没有一个简洁的经验法则可以遵循,因为它可能会根据您的工作流程而变化很大。
如您所见,调度程序将创建几个 fork 进程。此外,每个任务(除了 Dummy)都将在它自己的进程中运行。根据操作员和正在处理的数据,每个任务所需的内存量可能会有很大差异。
并行度设置将直接限制在所有 dag 运行/任务中同时运行的任务数量,这对您使用 LocalExecutor 产生最显着的影响。您也可以尝试将max_threads
under设置[scheduler]
为 1。
因此,一个(非常)普遍的经验法则是对资源慷慨:
[256 for scheduler itself] + ( [parallelism] * (100MB + [size of data you'll process]) )
根据您是加载完整数据集还是在任务执行过程中处理数据块,数据大小需要更改的地方。
即使您认为不需要扩展集群,我仍然建议使用 CeleryExecutor,即使只是将调度程序和任务相互隔离。这样,如果您的调度程序或 celery 工人死亡,它不会同时关闭。特别是在 k8 中运行,如果您的调度程序 sigterms 它将与任何正在运行的任务一起杀死它。如果您在不同的 pod 中运行它们并且调度程序 pod 重新启动,那么您可以不间断地完成任务。如果您有更多的工人,它将减少其他任务的内存/处理峰值的影响。
推荐阅读
- asp.net - 在发布时更改 .config 文件中的 appender 参数值(天蓝色管道)
- python - 在 PYTHONSTARTUP 中设置的 .pythonstartup 文件中打开 Python 解释器时如何读取当前目录?
- ios - ionic 4中的iOS推送通知没有声音
- android - 在颤振上动态添加choicechip
- java - FileUtils.copyFile() 在尝试复制 exe 时使程序崩溃
- php - laravel swagger 无法与 App 目录之外的控制器一起使用
- kotlin - 如何在 Kotlin 中反映可为空的字段
- node.js - 如何从 greenlock-express 获取证书文件以用于 Nginx?
- php - 如何在使用 php 上传之前裁剪图像?
- python - 在 Django 过滤方法中使用列字段