首页 > 解决方案 > 如何为气流任务生成不同的 ID?

问题描述

我正在尝试使用@task注释时间调用函数,但如果我尝试多次调用它,则N无法定义使用此装饰器的方法:task_id

airflow.exceptions.DuplicateTaskIdFound:任务 id 'my_task_group.make_request__1' 已添加到 DAG

@task
def make_request(params):
    return true

def my_first_function():
    # do stuff
    return make_request(params)

def my_second_function():
    # do stuff
    return make_request(params)

for i in range(0, 10)
    first = my_first_function()  # this will call "make_request"
    second = my_second_function()  # this will also call "make_request"

    first >> second

如何在注释上“重命名”task_id动态?@task

标签: pythonairflow

解决方案


使用@task允许task_id通过调用装饰函数来动态生成。国家的文档_get_unique_task_id

在给定 DAG(或者如果在 DAG 上下文中运行)的情况下生成唯一的任务 ID 通过在原始任务 ID 的末尾附加一个唯一的数字来生成 ID。示例:task_id task_id__1 task_id__2 ... task_id__20

使用此功能,无需动态“重命名”任务。在您的代码示例中,您应该装饰在循环中被调用的函数。这是一个正在运行的 2.0.1 版本示例:

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.operators.python import get_current_context

default_args = {
    'owner': 'airflow',
}

@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(1), catchup=False, tags=['example'])
def task_decorator_example():

    def make_request(params):
        print(f"Params: {params}")

    def _print_task_id():
        context = get_current_context()
        print(f"Result: {context['ti'].task_id}")

    @task
    def my_first_function():
        _print_task_id()
        context = get_current_context()
        return make_request(context['params'])

    @task
    def my_second_function():
        _print_task_id()
        params = {'foo': 'bar'}
        return make_request(params)

    for i in range(0, 3):
        first = my_first_function()  # this will call "make_request"
        second = my_second_function()  # this will also call "make_request"

        first >> second


example_decorated_dag = task_decorator_example()

创建此图表视图:

图视图

每个任务都会打印出来task_idparams合并后的日志输出如下:

- my_first_function
{logging_mixin.py:104} INFO - Result: my_first_function
{logging_mixin.py:104} INFO - Params: {}
- my_second_function
{logging_mixin.py:104} INFO - Result: my_second_function
{logging_mixin.py:104} INFO - Params: {'foo': 'bar'}
- my_first_function__1
{logging_mixin.py:104} INFO - Result: my_first_function__1
{logging_mixin.py:104} INFO - Params: {}
- my_second_function__1
{logging_mixin.py:104} INFO - Result: my_second_function__1
{logging_mixin.py:104} INFO - Params: {'foo': 'bar'}
- my_first_function__2
{logging_mixin.py:104} INFO - Result: my_first_function__2
{logging_mixin.py:104} INFO - Params: {}

希望对你有用!


推荐阅读