airflow - Airflow:如何在不同的 dags 中使用相同的任务
问题描述
我正在学习气流并遇到问题。
我有 2 个任务,我想在几个 dags 中使用它们。这些任务之间的区别仅在于操作员将获得的参数。
这可以通过简单地将任务复制并粘贴到所有 dag 中来完成,但维护这种类型的代码将是一场噩梦。所以想要做的是创建一个包含我将多次调用的任务的类,然后从 dags 中导入这个类。
我用一个最小的例子复制了这个问题。
这是该类的代码:
from airflow.operators.bash_operator import BashOperator
class Operator_generator():
_instance = None
def __init__(self, var1, var2):
self.var1 = var1
self.var2 = var2
def create_task_1(self):
return BashOperator(
task_id='task1',
bash_command='echo Im running task 1, the current execution date is {{ds}} and the previous execution date is {{prev_ds}}'
)
def create_task_2(self):
return BashOperator(
task_id='task2',
bash_command='echo Im running task 2, the current execution date is {{ds}} and the previous execution date is {{prev_ds}}'
)
这是一个 dag 示例,我将在其中导入该类
from include.src.date.decorator import DefaultDateTime
from airflow import DAG
from include.src.airflow.xcom import cleanup
from operator_creator import Operator_generator
dag_id = "dag1"
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": DefaultDateTime(2021, 6, 1),
'retries': 1
}
# Dag definition
with DAG(
dag_id,
schedule_interval='@monthly',
catchup=False,
on_failure_callback=cleanup,
on_success_callback=cleanup
) as dag:
dag.doc_md = __doc__
operator_generator = Operator_generator('var1','var2')
task1 = operator_generator.create_task_1()
task2 = operator_generator.create_task_2()
task1 >> task2
请注意,“var1”和“var2”是我需要对运算符进行参数化的变量。
问题是,当我运行 dag 时,任务会运行两次:
[2021-08-25 16:29:46,937] {taskinstance.py:880} INFO - Starting attempt 1 of 2
[2021-08-25 16:29:46,937] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2021-08-25 16:29:46,955] {taskinstance.py:900} INFO - Executing <Task(BashOperator): task1> on 2021-07-01T06:00:00+00:00
[2021-08-25 16:29:46,961] {standard_task_runner.py:53} INFO - Started process 67689 to run task
[2021-08-25 16:29:47,011] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: dag1.task1 2021-07-01T06:00:00+00:00 [running]> 30b770753547
[2021-08-25 16:29:47,032] {bash_operator.py:113} INFO - Tmp dir root location:
/tmp
[2021-08-25 16:29:47,033] {bash_operator.py:136} INFO - Temporary script location: /tmp/airflowtmpixijgd4s/task1lfcwdvfa
[2021-08-25 16:29:47,033] {bash_operator.py:146} INFO - Running command: echo Im running task 1, the current execution date is 2021-07-01 and the previous execution date is 2021-06-01
[2021-08-25 16:29:47,039] {bash_operator.py:153} INFO - Output:
[2021-08-25 16:29:47,040] {bash_operator.py:157} INFO - Im running task 1, the current execution date is 2021-07-01 and the previous execution date is 2021-06-01
[2021-08-25 16:29:47,040] {bash_operator.py:161} INFO - Command exited with return code 0
[2021-08-25 16:29:47,052] {taskinstance.py:1065} INFO - Marking task as SUCCESS.dag_id=dag1, task_id=task1, execution_date=20210701T060000, start_date=20210825T162946, end_date=20210825T162947
[2021-08-25 16:29:55,335] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: dag1.task1 2021-08-25T16:29:41+00:00 [queued]>
[2021-08-25 16:29:55,335] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: dag1.task2 2021-07-01T06:00:00+00:00 [queued]>
[2021-08-25 16:29:55,348] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: dag1.task1 2021-08-25T16:29:41+00:00 [queued]>
[2021-08-25 16:29:55,348] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2021-08-25 16:29:55,348] {taskinstance.py:880} INFO - Starting attempt 1 of 2
[2021-08-25 16:29:55,348] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2021-08-25 16:29:55,357] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: dag1.task2 2021-07-01T06:00:00+00:00 [queued]>
[2021-08-25 16:29:55,357] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2021-08-25 16:29:55,357] {taskinstance.py:880} INFO - Starting attempt 1 of 2
[2021-08-25 16:29:55,357] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2021-08-25 16:29:55,363] {taskinstance.py:900} INFO - Executing <Task(BashOperator): task1> on 2021-08-25T16:29:41+00:00
[2021-08-25 16:29:55,366] {standard_task_runner.py:53} INFO - Started process 67809 to run task
[2021-08-25 16:29:55,370] {taskinstance.py:900} INFO - Executing <Task(BashOperator): task2> on 2021-07-01T06:00:00+00:00
[2021-08-25 16:29:55,374] {standard_task_runner.py:53} INFO - Started process 67810 to run task
[2021-08-25 16:29:55,412] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: dag1.task1 2021-08-25T16:29:41+00:00 [running]> 30b770753547
[2021-08-25 16:29:55,422] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: dag1.task2 2021-07-01T06:00:00+00:00 [running]> 30b770753547
[2021-08-25 16:29:55,430] {bash_operator.py:113} INFO - Tmp dir root location:
/tmp
[2021-08-25 16:29:55,432] {bash_operator.py:136} INFO - Temporary script location: /tmp/airflowtmpsacovlfm/task1doc6fakb
[2021-08-25 16:29:55,432] {bash_operator.py:146} INFO - Running command: echo Im running task 1, the current execution date is 2021-08-25 and the previous execution date is 2021-08-25
[2021-08-25 16:29:55,440] {bash_operator.py:153} INFO - Output:
[2021-08-25 16:29:55,440] {bash_operator.py:157} INFO - Im running task 1, the current execution date is 2021-08-25 and the previous execution date is 2021-08-25
[2021-08-25 16:29:55,441] {bash_operator.py:161} INFO - Command exited with return code 0
[2021-08-25 16:29:55,444] {bash_operator.py:113} INFO - Tmp dir root location:
/tmp
[2021-08-25 16:29:55,445] {bash_operator.py:136} INFO - Temporary script location: /tmp/airflowtmpyqqww8an/task2i29a2lk7
[2021-08-25 16:29:55,445] {bash_operator.py:146} INFO - Running command: echo Im running task 2, the current execution date is 2021-07-01 and the previous execution date is 2021-06-01
[2021-08-25 16:29:55,451] {taskinstance.py:1065} INFO - Marking task as SUCCESS.dag_id=dag1, task_id=task1, execution_date=20210825T162941, start_date=20210825T162955, end_date=20210825T162955
[2021-08-25 16:29:55,453] {bash_operator.py:153} INFO - Output:
[2021-08-25 16:29:55,453] {bash_operator.py:157} INFO - Im running task 2, the current execution date is 2021-07-01 and the previous execution date is 2021-06-01
[2021-08-25 16:29:55,454] {bash_operator.py:161} INFO - Command exited with return code 0
[2021-08-25 16:29:55,465] {taskinstance.py:1065} INFO - Marking task as SUCCESS.dag_id=dag1, task_id=task2, execution_date=20210701T060000, start_date=20210825T162955, end_date=20210825T162955
[2021-08-25 16:29:56,922] {logging_mixin.py:112} INFO - [2021-08-25 16:29:56,921] {local_task_job.py:103} INFO - Task exited with return code 0
[2021-08-25 16:30:05,333] {logging_mixin.py:112} INFO - [2021-08-25 16:30:05,333] {local_task_job.py:103} INFO - Task exited with return code 0
[2021-08-25 16:30:05,337] {logging_mixin.py:112} INFO - [2021-08-25 16:30:05,337] {local_task_job.py:103} INFO - Task exited with return code 0
[2021-08-25 16:30:06,794] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: dag1.task2 2021-08-25T16:29:41+00:00 [queued]>
[2021-08-25 16:30:06,809] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: dag1.task2 2021-08-25T16:29:41+00:00 [queued]>
[2021-08-25 16:30:06,809] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2021-08-25 16:30:06,810] {taskinstance.py:880} INFO - Starting attempt 1 of 2
[2021-08-25 16:30:06,810] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2021-08-25 16:30:06,822] {taskinstance.py:900} INFO - Executing <Task(BashOperator): task2> on 2021-08-25T16:29:41+00:00
[2021-08-25 16:30:06,826] {standard_task_runner.py:53} INFO - Started process 67937 to run task
[2021-08-25 16:30:06,875] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: dag1.task2 2021-08-25T16:29:41+00:00 [running]> 30b770753547
[2021-08-25 16:30:06,892] {bash_operator.py:113} INFO - Tmp dir root location:
/tmp
[2021-08-25 16:30:06,893] {bash_operator.py:136} INFO - Temporary script location: /tmp/airflowtmpot_xsukw/task2xo4uxspu
[2021-08-25 16:30:06,893] {bash_operator.py:146} INFO - Running command: echo Im running task 2, the current execution date is 2021-08-25 and the previous execution date is 2021-08-25
[2021-08-25 16:30:06,901] {bash_operator.py:153} INFO - Output:
[2021-08-25 16:30:06,902] {bash_operator.py:157} INFO - Im running task 2, the current execution date is 2021-08-25 and the previous execution date is 2021-08-25
[2021-08-25 16:30:06,902] {bash_operator.py:161} INFO - Command exited with return code 0
[2021-08-25 16:30:06,913] {taskinstance.py:1065} INFO - Marking task as SUCCESS.dag_id=dag1, task_id=task2, execution_date=20210825T162941, start_date=20210825T163006, end_date=20210825T163006
[2021-08-25 16:30:16,800] {logging_mixin.py:112} INFO - [2021-08-25 16:30:16,799] {local_task_job.py:103} INFO - Task exited with return code 0
注意任务是如何执行 2 次的:
- 在第一次执行中,{{ds}} 和 {{prev_ds}} 的值是当前日期。
- 在第二次执行中,{{ds}} 和 {{prev_ds}} 的值对应于每月间隔。
为什么任务运行 2 次?有没有办法导入这样的任务?
注意 1:我不允许使用 subdags。
编辑2:如果有人遇到这个问题,我想通了。
问题是我正在使用外部触发器运行 dag,它会删除 dag 并重新启动它。所以 dag 为外部触发器运行,但调度程序也看到 dag 没有运行一个月,所以它调度执行,导致 2 次运行。
我找到的解决方案是:
- 关闭气流接口中的dag
- 删除 dag(dag 最右侧的红色 x)
- 刷新页面
- dag 再次出现在列表中,打开它
这将使调度程序完成其工作,并且 dag 将按应有的方式运行。
解决方案
推荐阅读
- java - 如何在keycloak中获取用户手机号码
- opengl - 如何在不更改绘制顺序的情况下绘制具有 alpha 纹理的图像
- c++ - 如何将int16数组更改为向量
- bash - 如何通过将特定单词指定为 grep 的输入来检查特定单词
- asp.net - 我如何同时使用 TModel 和自定义模型或可以替代?
- android - 将android平台添加到ionic时出错
- javascript - 如何获取两个日期之间的数据
- xcode - Xcode:如何将框架搜索路径添加到搜索路径
- ios - 向下滚动时如何在 UITableView 中更改此 UIView 的背景颜色
- angular - Angular Rxjs 多个并行 HTTP 请求/调用阻塞导航 - 优先取消队列并发