parallel-processing - Apache Airflow:在单个 DAG 运行中运行所有并行任务
问题描述
我有一个有 30 个(或更多)动态创建的并行任务的 DAG。
我concurrency
在该 DAG 上设置了选项,因此在追赶历史记录时,我只有一个 DAG Run 运行。当我在我的服务器上运行它时,实际上只有 16 个任务并行运行,而其余 14 个任务只是等待排队。
我应该更改哪个设置,以便我只运行 1 个 DAG Run,但所有 30 多个任务并行运行?
根据这个 FAQ,它似乎是dag_concurrency
or之一max_active_runs_per_dag
,但前者似乎已经被concurrency
设置过度驱动,而后者似乎没有效果(或者我实际上搞砸了我的设置)。这是示例代码:
import datetime as dt
import logging
from airflow.operators.dummy_operator import DummyOperator
import config
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
'concurrency': 1,
'retries': 0,
}
def print_operators(ds, **kwargs):
logging.info(f"Task {kwargs.get('task_instance_key_str', 'unknown_task_instance')}")
dag = DAG(
dag_id='test_parallelism_dag',
start_date=dt.datetime(2019, 1, 1),
default_args=default_args,
schedule_interval='@daily',
catchup=True,
template_searchpath=[config.DAGS_PATH],
params={'schema': config.SCHEMA_DB},
max_active_runs=1,
)
print_operators = [PythonOperator(
task_id=f'test_parallelism_dag.print_operator_{i}',
python_callable=print_operators,
provide_context=True,
dag=dag
) for i in range(60)]
dummy_operator_start = DummyOperator(
task_id=f'test_parallelism_dag.dummy_operator_start',
)
dummy_operator_end = DummyOperator(
task_id=f'test_parallelism_dag.dummy_operator_end',
)
dummy_operator_start >> print_operators >> dummy_operator_end
编辑 1:我的当前airflow.cfg
包含:
executor = SequentialExecutor
parallelism = 32
dag_concurrency = 24
max_active_runs_per_dag = 26
我的环境变量如下(将它们全部设置为不同的,以便轻松发现哪一个有帮助):
AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__DAG_CONCURRENCY=18
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=20
AIRFLOW__CORE__WORKER_CONCURRENCY=22
哪种提示我设置 DAG_CONCURRENCY 环境变量有效。
解决方案
要更改的实际参数在 airflow.cfg 中或使用env 变量dag_concurrency
覆盖它。AIRFLOW__CORE__DAG_CONCURRENCY
根据我在问题中提到的文档:
concurrency
$concurrency
:Airflow 调度程序将在任何给定时间为您的 DAG运行不超过任务实例。并发在您的 Airflow DAG 中定义。如果您没有在 DAG 上设置并发,调度程序将使用dag_concurrency
您的airflow.cfg 条目中的默认值。
这意味着遵循简化的代码:
default_args = {
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
'concurrency': 1,
}
dag = DAG(
dag_id='test_parallelism_dag',
default_args=default_args,
max_active_runs=1,
)
应改写为:
default_args = {
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
}
dag = DAG(
dag_id='test_parallelism_dag',
default_args=default_args,
max_active_runs=1,
concurrency=30
)
我的代码实际上有错误的假设,即default_args
在某些时候将实际的 kwargs 替换为 DAG 构造函数。我不知道当时是什么导致我得出这个结论,但我想设置concurrency
为1
有一些草稿剩余,它实际上从未影响任何东西,并且实际 DAG 并发是从配置默认值设置的,即 16。
推荐阅读
- matrix - 带有矩阵起始向量的 sympy nsolve typeError
- r - 将标签(即“A”、“B”)添加到 facet wrap ggplot
- ruby-on-rails - 如何使用 ruby 和 parslet 解析 rtf 文本?
- java - 用于递归搜索路径并列出以特定字符串开头的所有目录的 Java 代码
- c# - 在 WPF MediaElement 中播放 HTTPS 视频 URL 的解决方法
- android - 在 Google Play 中,它显示“您需要为此应用程序上传 APK 或 Android App Bundle”
- php - 从数据 php regx 中删除一些内容
- java - 无法使用 IntelliJ 连接到 oracle 数据库,无法识别区域设置
- conemu - ConEmu - 可点击的路径,如 URL
- javascript - 访问鼠标指向的选定类