首页 > 解决方案 > 气流运算符和 dags 并正确返回、公开和访问值?

问题描述

我需要创建一个气流运算符,它接受一些输入并返回一个字符串,该字符串将用作下一个将运行的另一个运算符的输入。我是气流 dags 和操作员的新手,对如何正确执行此操作感到困惑。由于我是为使用气流和构建 dags 的人构建这个,而我不是真正的气流用户或 dag 开发人员,我想就如何正确地进行操作获得建议。我创建了一个运算符,它返回一个令牌(只是一个字符串,所以 hello world 运算符示例工作正常)。这样做我看到了 dag 执行的 xcom 值中的值。但是我将如何正确检索该值并将其输入到下一个运算符中?对于我的示例,我只是调用了相同的运算符,但实际上它将调用不同的运算符。我只是不知道如何正确编码。我只是将代码添加到 dag 中吗?操作员是否需要添加代码?还是应该有别的东西?

示例达格:

import logging
import os

from airflow import DAG
from airflow.utils.dates import days_ago
from custom_operators.hello_world import HelloWorldOperator
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
}

dag = DAG("hello_world_test",
    description='Testing out a operator',
    start_date=days_ago(1),
    schedule_interval=None,
    catchup=False,
    default_args=default_args)

get_token = HelloWorldOperator(
    task_id='hello_world_task_1',
    name='My input to generate a token',
    dag=dag
)

token = "My token" # Want this to be the return value from get_token

run_this = HelloWorldOperator(
    task_id='hello_world_task_2',
    name=token,
    dag=dag
)

logging.info("Start")
get_token >> run_this
logging.info("End")

Hello World 运营商:

from airflow.models.baseoperator import BaseOperator

class HelloWorldOperator(BaseOperator):

    def __init__(
            self,
            some_input: str,
            **kwargs) -> None:
        super().__init__(**kwargs)
        self.some_input = some_input

    def execute(self, context):
        # Bunch of business logic
        token = "MyGeneratedToken"
        return token

标签: pythonairflow

解决方案


这是一个好的开始:)。

从另一个任务中检索令牌的正确方法是使用 jinja 模板

run_this = RetrieveToken(
    task_id='hello_world_task_2',
    retrieved_token="{{ ti.xcom_pull(task_ids=[\'hello_world_task_1\']) }}'",
    dag=dag
)

但是,您必须记住在 RetrieveToken 中添加retrieved_tokentemplate_fields数组: https ://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html#templating

您还可以xcom_pull在“检索”运算符中显式调用方法,并将“来源”任务 ID 传递给运算符以从正确的任务中检索它。


推荐阅读