python - 在 Apache Airflow 中运行超过 32 个并发任务
问题描述
我正在运行 Apache Airflow 1.8.1。我想在我的实例上运行超过 32 个并发任务,但无法让任何配置工作。
我正在使用 CeleryExecutor,UI 中的 Airflow 配置显示为 64,parallelism
并且dag_concurrency
我已经多次重新启动了 Airflow 调度程序、Web 服务器和工作程序(我实际上是在 Vagrant 机器上进行本地测试,但也已经在EC2 实例)。
气流.cfg
# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 64
# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 64
示例 DAG。我已经concurrency
直接在 DAG 中尝试了不带参数和带参数的方法。
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
dag = DAG(
'concurrency_dev',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 1),
},
schedule_interval=None,
catchup=False
)
for i in range(0, 40):
BashOperator(
task_id='concurrency_dev_{i}'.format(i=i),
bash_command='sleep 60',
dag=dag
)
无论如何,只有 32 个任务同时执行。
解决方案
如果您有 2 个工作人员,celeryd_concurrency = 16
那么您将被限制为 32 个任务。如果non_pooled_task_slot_count = 32
你也受到限制。当然parallelism
,dag_concurrency
不仅网络服务器和调度程序需要设置在 32 以上,工作人员也需要设置。
推荐阅读
- react-native - 获取图标以使用 Vue native / React Native - createMaterialBottomTabNavigator
- sql - 尝试将值插入 sql 时如何避免以下错误?
- c# - 将 Web 服务 API 中的 XML 字符串反序列化为 C# 对象
- java - 如何添加选项以在 Javadoc 注释中显示 @tags
- sql - 与用户之前的订单相比,如何计算订单的相似度?
- haskell - 如何声明两种类型的组合是一个monad
- javascript - 如何让函数接受多种文件类型?
- apache - 在通配符 vhost apache (xampp) 设置中重定向子子域
- javascript - 如何访问我的复选框组件状态?
- r - R - stat_compare_means 从 Kruskal-Wallis 测试返回不同的值