首页 > 解决方案 > Airflow:如何在不同的 dags 中使用相同的任务

问题描述

我正在学习气流并遇到问题。

我有 2 个任务,我想在几个 dags 中使用它们。这些任务之间的区别仅在于操作员将获得的参数。

这可以通过简单地将任务复制并粘贴到所有 dag 中来完成,但维护这种类型的代码将是一场噩梦。所以想要做的是创建一个包含我将多次调用的任务的类,然后从 dags 中导入这个类。

我用一个最小的例子复制了这个问题。

这是该类的代码:

from airflow.operators.bash_operator import BashOperator

class Operator_generator():
    _instance = None
    def __init__(self, var1, var2):
        self.var1 = var1
        self.var2 = var2

    def create_task_1(self):
        return BashOperator(
        task_id='task1',
        bash_command='echo Im running task 1, the current execution date is {{ds}} and the previous execution date is {{prev_ds}}'
    )

    def create_task_2(self):
        return BashOperator(
        task_id='task2',
        bash_command='echo Im running task 2, the current execution date is {{ds}} and the previous execution date is {{prev_ds}}'
    )

这是一个 dag 示例,我将在其中导入该类

from include.src.date.decorator import DefaultDateTime
from airflow import DAG
from include.src.airflow.xcom import cleanup
from operator_creator import Operator_generator

dag_id = "dag1"

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": DefaultDateTime(2021, 6, 1),
    'retries': 1
}


# Dag definition
with DAG(
        dag_id,
        schedule_interval='@monthly',
        catchup=False,
        on_failure_callback=cleanup,
        on_success_callback=cleanup
) as dag:
    dag.doc_md = __doc__

    operator_generator = Operator_generator('var1','var2')

    task1 = operator_generator.create_task_1()
    task2 = operator_generator.create_task_2()

    task1 >> task2

请注意,“var1”和“var2”是我需要对运算符进行参数化的变量。

问题是,当我运行 dag 时,任务会运行两次:

[2021-08-25 16:29:46,937] {taskinstance.py:880} INFO - Starting attempt 1 of 2
[2021-08-25 16:29:46,937] {taskinstance.py:881} INFO - 
--------------------------------------------------------------------------------
[2021-08-25 16:29:46,955] {taskinstance.py:900} INFO - Executing <Task(BashOperator): task1> on 2021-07-01T06:00:00+00:00
[2021-08-25 16:29:46,961] {standard_task_runner.py:53} INFO - Started process 67689 to run task
[2021-08-25 16:29:47,011] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: dag1.task1 2021-07-01T06:00:00+00:00 [running]> 30b770753547
[2021-08-25 16:29:47,032] {bash_operator.py:113} INFO - Tmp dir root location: 
 /tmp
[2021-08-25 16:29:47,033] {bash_operator.py:136} INFO - Temporary script location: /tmp/airflowtmpixijgd4s/task1lfcwdvfa
[2021-08-25 16:29:47,033] {bash_operator.py:146} INFO - Running command: echo Im running task 1, the current execution date is 2021-07-01 and the previous execution date is 2021-06-01
[2021-08-25 16:29:47,039] {bash_operator.py:153} INFO - Output:
[2021-08-25 16:29:47,040] {bash_operator.py:157} INFO - Im running task 1, the current execution date is 2021-07-01 and the previous execution date is 2021-06-01
[2021-08-25 16:29:47,040] {bash_operator.py:161} INFO - Command exited with return code 0
[2021-08-25 16:29:47,052] {taskinstance.py:1065} INFO - Marking task as SUCCESS.dag_id=dag1, task_id=task1, execution_date=20210701T060000, start_date=20210825T162946, end_date=20210825T162947
[2021-08-25 16:29:55,335] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: dag1.task1 2021-08-25T16:29:41+00:00 [queued]>
[2021-08-25 16:29:55,335] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: dag1.task2 2021-07-01T06:00:00+00:00 [queued]>
[2021-08-25 16:29:55,348] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: dag1.task1 2021-08-25T16:29:41+00:00 [queued]>
[2021-08-25 16:29:55,348] {taskinstance.py:879} INFO - 
--------------------------------------------------------------------------------
[2021-08-25 16:29:55,348] {taskinstance.py:880} INFO - Starting attempt 1 of 2
[2021-08-25 16:29:55,348] {taskinstance.py:881} INFO - 
--------------------------------------------------------------------------------
[2021-08-25 16:29:55,357] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: dag1.task2 2021-07-01T06:00:00+00:00 [queued]>
[2021-08-25 16:29:55,357] {taskinstance.py:879} INFO - 
--------------------------------------------------------------------------------
[2021-08-25 16:29:55,357] {taskinstance.py:880} INFO - Starting attempt 1 of 2
[2021-08-25 16:29:55,357] {taskinstance.py:881} INFO - 
--------------------------------------------------------------------------------
[2021-08-25 16:29:55,363] {taskinstance.py:900} INFO - Executing <Task(BashOperator): task1> on 2021-08-25T16:29:41+00:00
[2021-08-25 16:29:55,366] {standard_task_runner.py:53} INFO - Started process 67809 to run task
[2021-08-25 16:29:55,370] {taskinstance.py:900} INFO - Executing <Task(BashOperator): task2> on 2021-07-01T06:00:00+00:00
[2021-08-25 16:29:55,374] {standard_task_runner.py:53} INFO - Started process 67810 to run task
[2021-08-25 16:29:55,412] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: dag1.task1 2021-08-25T16:29:41+00:00 [running]> 30b770753547
[2021-08-25 16:29:55,422] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: dag1.task2 2021-07-01T06:00:00+00:00 [running]> 30b770753547
[2021-08-25 16:29:55,430] {bash_operator.py:113} INFO - Tmp dir root location: 
 /tmp
[2021-08-25 16:29:55,432] {bash_operator.py:136} INFO - Temporary script location: /tmp/airflowtmpsacovlfm/task1doc6fakb
[2021-08-25 16:29:55,432] {bash_operator.py:146} INFO - Running command: echo Im running task 1, the current execution date is 2021-08-25 and the previous execution date is 2021-08-25
[2021-08-25 16:29:55,440] {bash_operator.py:153} INFO - Output:
[2021-08-25 16:29:55,440] {bash_operator.py:157} INFO - Im running task 1, the current execution date is 2021-08-25 and the previous execution date is 2021-08-25
[2021-08-25 16:29:55,441] {bash_operator.py:161} INFO - Command exited with return code 0
[2021-08-25 16:29:55,444] {bash_operator.py:113} INFO - Tmp dir root location: 
 /tmp
[2021-08-25 16:29:55,445] {bash_operator.py:136} INFO - Temporary script location: /tmp/airflowtmpyqqww8an/task2i29a2lk7
[2021-08-25 16:29:55,445] {bash_operator.py:146} INFO - Running command: echo Im running task 2, the current execution date is 2021-07-01 and the previous execution date is 2021-06-01
[2021-08-25 16:29:55,451] {taskinstance.py:1065} INFO - Marking task as SUCCESS.dag_id=dag1, task_id=task1, execution_date=20210825T162941, start_date=20210825T162955, end_date=20210825T162955
[2021-08-25 16:29:55,453] {bash_operator.py:153} INFO - Output:
[2021-08-25 16:29:55,453] {bash_operator.py:157} INFO - Im running task 2, the current execution date is 2021-07-01 and the previous execution date is 2021-06-01
[2021-08-25 16:29:55,454] {bash_operator.py:161} INFO - Command exited with return code 0
[2021-08-25 16:29:55,465] {taskinstance.py:1065} INFO - Marking task as SUCCESS.dag_id=dag1, task_id=task2, execution_date=20210701T060000, start_date=20210825T162955, end_date=20210825T162955
[2021-08-25 16:29:56,922] {logging_mixin.py:112} INFO - [2021-08-25 16:29:56,921] {local_task_job.py:103} INFO - Task exited with return code 0
[2021-08-25 16:30:05,333] {logging_mixin.py:112} INFO - [2021-08-25 16:30:05,333] {local_task_job.py:103} INFO - Task exited with return code 0
[2021-08-25 16:30:05,337] {logging_mixin.py:112} INFO - [2021-08-25 16:30:05,337] {local_task_job.py:103} INFO - Task exited with return code 0
[2021-08-25 16:30:06,794] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: dag1.task2 2021-08-25T16:29:41+00:00 [queued]>
[2021-08-25 16:30:06,809] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: dag1.task2 2021-08-25T16:29:41+00:00 [queued]>
[2021-08-25 16:30:06,809] {taskinstance.py:879} INFO - 
--------------------------------------------------------------------------------
[2021-08-25 16:30:06,810] {taskinstance.py:880} INFO - Starting attempt 1 of 2
[2021-08-25 16:30:06,810] {taskinstance.py:881} INFO - 
--------------------------------------------------------------------------------
[2021-08-25 16:30:06,822] {taskinstance.py:900} INFO - Executing <Task(BashOperator): task2> on 2021-08-25T16:29:41+00:00
[2021-08-25 16:30:06,826] {standard_task_runner.py:53} INFO - Started process 67937 to run task
[2021-08-25 16:30:06,875] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: dag1.task2 2021-08-25T16:29:41+00:00 [running]> 30b770753547
[2021-08-25 16:30:06,892] {bash_operator.py:113} INFO - Tmp dir root location: 
 /tmp
[2021-08-25 16:30:06,893] {bash_operator.py:136} INFO - Temporary script location: /tmp/airflowtmpot_xsukw/task2xo4uxspu
[2021-08-25 16:30:06,893] {bash_operator.py:146} INFO - Running command: echo Im running task 2, the current execution date is 2021-08-25 and the previous execution date is 2021-08-25
[2021-08-25 16:30:06,901] {bash_operator.py:153} INFO - Output:
[2021-08-25 16:30:06,902] {bash_operator.py:157} INFO - Im running task 2, the current execution date is 2021-08-25 and the previous execution date is 2021-08-25
[2021-08-25 16:30:06,902] {bash_operator.py:161} INFO - Command exited with return code 0
[2021-08-25 16:30:06,913] {taskinstance.py:1065} INFO - Marking task as SUCCESS.dag_id=dag1, task_id=task2, execution_date=20210825T162941, start_date=20210825T163006, end_date=20210825T163006
[2021-08-25 16:30:16,800] {logging_mixin.py:112} INFO - [2021-08-25 16:30:16,799] {local_task_job.py:103} INFO - Task exited with return code 0

注意任务是如何执行 2 次的:

为什么任务运行 2 次?有没有办法导入这样的任务?

注意 1:我不允许使用 subdags。

编辑:添加执行树 在此处输入图像描述

编辑2:如果有人遇到这个问题,我想通了。

问题是我正在使用外部触发器运行 dag,它会删除 dag 并重新启动它。所以 dag 为外部触发器运行,但调度程序也看到 dag 没有运行一个月,所以它调度执行,导致 2 次运行。

我找到的解决方案是:

这将使调度程序完成其工作,并且 dag 将按应有的方式运行。

标签: airflow

解决方案


推荐阅读