python - 气流:动态生成任务时“已注册 DAG”
问题描述
最近我开始在我的一些动态生成任务的 dag 文件中使用 TaskFlow API,并开始注意到日志中的(很多)警告消息。以下是生成此消息的虚拟 dag 文件:
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup
NUMS = [1, 2]
default_args = {
"owner": "henrique",
"depends_on_past": False,
"email_on_failure": True,
"email_on_retry": False,
"retries": 3,
"retry_delay": timedelta(minutes=15),
}
def print_id(num: int):
print(num)
return num
def run_tests():
results = []
for i in NUMS:
result = task(task_id=f"run_{i}")(print_id)(i)
results.append(result)
return results
@task()
def agg(results):
print(results)
@dag(
"test_tg",
default_args=default_args,
schedule_interval="@once",
start_date=days_ago(1),
max_active_runs=1,
)
def test_supervisor():
task_start = DummyOperator(task_id="task_start")
task_end = DummyOperator(task_id="task_end")
groups = []
for i in NUMS:
with TaskGroup(group_id=f"{i}_num_group") as tg:
results = run_tests()
aggregation = agg(results)
groups.append(tg)
task_start >> groups >> task_end
data_dag = test_supervisor()
运行这个 dag 时,我开始收到很多以下警告消息:
"[2021-08-24 09:46:46,438] {baseoperator.py:1301} WARNING - Dependency <Task(_PythonDecoratedOperator): 2_num_group.agg>, 2_num_group.run_2 already registered for DAG: test_tg"
"[2021-08-24 09:46:46,438] {baseoperator.py:1301} WARNING - Dependency <Task(_PythonDecoratedOperator): 2_num_group.run_2>, 2_num_group.agg already registered for DAG: test_tg"
"[2021-08-24 09:46:46,437] {baseoperator.py:1301} WARNING - Dependency <Task(_PythonDecoratedOperator): 2_num_group.run_1>, 2_num_group.agg already registered for DAG: test_tg"
"[2021-08-24 09:46:46,413] {baseoperator.py:1301} WARNING - Dependency <Task(_PythonDecoratedOperator): 1_num_group.agg>, 1_num_group.run_1 already registered for DAG: test_tg"
....
即使在 DAG 未运行以及暂停时,这些消息也会继续出现。
我在创建任务时做错了吗?
提前致谢
解决方案
我尝试了您的代码并且工作正常,我没有收到任何提到的警告。我正在v2.1.2
使用官方 docker-compose 设置运行 Airflow。
我在 Airflow 的 repo ( pr , pr ) 中发现了一些与您收到的消息相关的旧版本问题,但现在应该已经解决了。尝试升级到最新版本的 Airflow,应该可以解决问题。
编辑:
以下是我将您的代码复制并粘贴到我正在运行的 AF 中后获得的内容:
图表视图:
日志:
airflow dags test test_tg 2021-08-24
输出:
[2021-08-24 15:59:19,247] {dagbag.py:496} INFO - Filling up the DagBag from /opt/airflow/dags
[2021-08-24 15:59:19,865] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: test_tg.task_start 2021-08-24 00:00:00+00:00 [queued]>']
[2021-08-24 15:59:24,991] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_tg
AIRFLOW_CTX_TASK_ID=task_start
AIRFLOW_CTX_EXECUTION_DATE=2021-08-24T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-08-24T00:00:00+00:00
[2021-08-24 15:59:25,003] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=test_tg, task_id=task_start, execution_date=20210824T000000, start_date=20210824T120558, end_date=20210824T155925
[2021-08-24 15:59:25,043] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-08-24 15:59:25,083] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 7 | succeeded: 1 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 7
[2021-08-24 15:59:25,122] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: test_tg.1_num_group.run_1 2021-08-24 00:00:00+00:00 [queued]>']
[2021-08-24 15:59:25,164] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: test_tg.1_num_group.run_2 2021-08-24 00:00:00+00:00 [queued]>']
[2021-08-24 15:59:25,241] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: test_tg.2_num_group.run_1 2021-08-24 00:00:00+00:00 [queued]>']
[2021-08-24 15:59:25,279] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: test_tg.2_num_group.run_2 2021-08-24 00:00:00+00:00 [queued]>']
[2021-08-24 15:59:29,814] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_tg
AIRFLOW_CTX_TASK_ID=1_num_group.run_1
AIRFLOW_CTX_EXECUTION_DATE=2021-08-24T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-08-24T00:00:00+00:00
1
[2021-08-24 15:59:29,849] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=test_tg, task_id=1_num_group.run_1, execution_date=20210824T000000, start_date=20210824T120558, end_date=20210824T155929
[2021-08-24 15:59:29,876] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-08-24 15:59:29,912] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_tg
AIRFLOW_CTX_TASK_ID=1_num_group.run_2
AIRFLOW_CTX_EXECUTION_DATE=2021-08-24T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-08-24T00:00:00+00:00
2
[2021-08-24 15:59:29,932] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=test_tg, task_id=1_num_group.run_2, execution_date=20210824T000000, start_date=20210824T120558, end_date=20210824T155929
[2021-08-24 15:59:29,965] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-08-24 15:59:30,027] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_tg
AIRFLOW_CTX_TASK_ID=2_num_group.run_1
AIRFLOW_CTX_EXECUTION_DATE=2021-08-24T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-08-24T00:00:00+00:00
1
[2021-08-24 15:59:30,060] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=test_tg, task_id=2_num_group.run_1, execution_date=20210824T000000, start_date=20210824T120558, end_date=20210824T155930
[2021-08-24 15:59:30,087] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-08-24 15:59:30,124] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_tg
AIRFLOW_CTX_TASK_ID=2_num_group.run_2
AIRFLOW_CTX_EXECUTION_DATE=2021-08-24T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-08-24T00:00:00+00:00
2
[2021-08-24 15:59:30,151] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=test_tg, task_id=2_num_group.run_2, execution_date=20210824T000000, start_date=20210824T120558, end_date=20210824T155930
[2021-08-24 15:59:30,185] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-08-24 15:59:30,238] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 3 | succeeded: 5 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 3
[2021-08-24 15:59:30,275] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: test_tg.1_num_group.agg 2021-08-24 00:00:00+00:00 [queued]>']
[2021-08-24 15:59:30,310] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: test_tg.2_num_group.agg 2021-08-24 00:00:00+00:00 [queued]>']
[2021-08-24 15:59:34,826] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_tg
AIRFLOW_CTX_TASK_ID=1_num_group.agg
AIRFLOW_CTX_EXECUTION_DATE=2021-08-24T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-08-24T00:00:00+00:00
[1, 2]
[2021-08-24 15:59:34,833] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=test_tg, task_id=1_num_group.agg, execution_date=20210824T000000, start_date=20210824T120558, end_date=20210824T155934
[2021-08-24 15:59:34,859] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-08-24 15:59:34,904] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_tg
AIRFLOW_CTX_TASK_ID=2_num_group.agg
AIRFLOW_CTX_EXECUTION_DATE=2021-08-24T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-08-24T00:00:00+00:00
[1, 2]
[2021-08-24 15:59:34,915] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=test_tg, task_id=2_num_group.agg, execution_date=20210824T000000, start_date=20210824T120558, end_date=20210824T155934
[2021-08-24 15:59:34,945] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-08-24 15:59:34,982] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 7 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
[2021-08-24 15:59:35,014] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: test_tg.task_end 2021-08-24 00:00:00+00:00 [queued]>']
[2021-08-24 15:59:39,829] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_tg
AIRFLOW_CTX_TASK_ID=task_end
AIRFLOW_CTX_EXECUTION_DATE=2021-08-24T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-08-24T00:00:00+00:00
[2021-08-24 15:59:39,841] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=test_tg, task_id=task_end, execution_date=20210824T000000, start_date=20210824T120558, end_date=20210824T155939
[2021-08-24 15:59:39,867] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-08-24 15:59:39,890] {dagrun.py:444} INFO - Marking run <DagRun test_tg @ 2021-08-24 00:00:00+00:00: backfill__2021-08-24T00:00:00+00:00, externally triggered: False> successful
[2021-08-24 15:59:39,898] {backfill_job.py:388} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 8 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2021-08-24 15:59:39,905] {backfill_job.py:831} INFO - Backfill done. Exiting.
airflow info
输出:
default@91172692e679:/opt/airflow$ airflow info
Apache Airflow
version | 2.1.2
executor | CeleryExecutor
task_logging_handler | airflow.utils.log.file_task_handler.FileTaskHandler
sql_alchemy_conn | postgresql+psycopg2://airflow:airflow@postgres/airflow
dags_folder | /opt/airflow/dags
plugins_folder | /opt/airflow/plugins
base_log_folder | /opt/airflow/logs
remote_base_log_folder |
...
...
推荐阅读
- javascript - Firebase 实时数据库规则在前端不起作用
- javascript - 如何在 Firebase Cloud Function 条件下使用模板文字
- typescript - TypeScript 动态联合
- r - 连接列名和R中的第一行
- python - 使用 Tkinter 在框架内放置框架时遇到困难
- r - 在读取 R 中多个经过验证的 json 文件的 jsonlite 中解析错误
- javascript - Joi - 多个 `when` 子句
- sql - 根据存储为 Char(4) 的时间计算持续时间
- html - 对齐 CSS 网格布局卡上的元素
- acumatica - Acumatica 错误:错误:系统未能提交 MapFrom 行