python - 如何为气流任务生成不同的 ID?
问题描述
我正在尝试使用@task
注释时间调用函数,但如果我尝试多次调用它,则N
无法定义使用此装饰器的方法:task_id
airflow.exceptions.DuplicateTaskIdFound:任务 id 'my_task_group.make_request__1' 已添加到 DAG
@task
def make_request(params):
return true
def my_first_function():
# do stuff
return make_request(params)
def my_second_function():
# do stuff
return make_request(params)
for i in range(0, 10)
first = my_first_function() # this will call "make_request"
second = my_second_function() # this will also call "make_request"
first >> second
如何在注释上“重命名”task_id
动态?@task
解决方案
使用@task
允许task_id
通过调用装饰函数来动态生成。国家的文档_get_unique_task_id
:
在给定 DAG(或者如果在 DAG 上下文中运行)的情况下生成唯一的任务 ID 通过在原始任务 ID 的末尾附加一个唯一的数字来生成 ID。示例:task_id task_id__1 task_id__2 ... task_id__20
使用此功能,无需动态“重命名”任务。在您的代码示例中,您应该装饰在循环中被调用的函数。这是一个正在运行的 2.0.1 版本示例:
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.operators.python import get_current_context
default_args = {
'owner': 'airflow',
}
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(1), catchup=False, tags=['example'])
def task_decorator_example():
def make_request(params):
print(f"Params: {params}")
def _print_task_id():
context = get_current_context()
print(f"Result: {context['ti'].task_id}")
@task
def my_first_function():
_print_task_id()
context = get_current_context()
return make_request(context['params'])
@task
def my_second_function():
_print_task_id()
params = {'foo': 'bar'}
return make_request(params)
for i in range(0, 3):
first = my_first_function() # this will call "make_request"
second = my_second_function() # this will also call "make_request"
first >> second
example_decorated_dag = task_decorator_example()
创建此图表视图:
每个任务都会打印出来task_id
,params
合并后的日志输出如下:
- my_first_function
{logging_mixin.py:104} INFO - Result: my_first_function
{logging_mixin.py:104} INFO - Params: {}
- my_second_function
{logging_mixin.py:104} INFO - Result: my_second_function
{logging_mixin.py:104} INFO - Params: {'foo': 'bar'}
- my_first_function__1
{logging_mixin.py:104} INFO - Result: my_first_function__1
{logging_mixin.py:104} INFO - Params: {}
- my_second_function__1
{logging_mixin.py:104} INFO - Result: my_second_function__1
{logging_mixin.py:104} INFO - Params: {'foo': 'bar'}
- my_first_function__2
{logging_mixin.py:104} INFO - Result: my_first_function__2
{logging_mixin.py:104} INFO - Params: {}
希望对你有用!
推荐阅读
- c# - 如何显示 Windows 窗体后面的内容?
- javascript - 两个运动物体的交集
- regex - 如何从句子中提取时间并转换为数字?
- javascript - JS 如何评估表达式 ({...}).objMethod()?
- mysql - 为什么“SELECT 'ä' = 'ae' COLLATE latin1_german2_ci;”的结果 是 1?
- javascript - 如何修复“加载页面时打开的侧边栏”?
- c# - 如何从 Listbox.SelectedItem 获取数据源的不同成员
- python-3.x - 在 gpu 上编译并使用 opencv 使用 python 进行图像处理
- scala - 请帮忙解释一个Scala构造函数和伴生对象的案例
- python - Django中的Faker:'Generator'的实例没有'name'memberpylint(no-member)