python - 气流运算符和 dags 并正确返回、公开和访问值?
问题描述
我需要创建一个气流运算符,它接受一些输入并返回一个字符串,该字符串将用作下一个将运行的另一个运算符的输入。我是气流 dags 和操作员的新手,对如何正确执行此操作感到困惑。由于我是为使用气流和构建 dags 的人构建这个,而我不是真正的气流用户或 dag 开发人员,我想就如何正确地进行操作获得建议。我创建了一个运算符,它返回一个令牌(只是一个字符串,所以 hello world 运算符示例工作正常)。这样做我看到了 dag 执行的 xcom 值中的值。但是我将如何正确检索该值并将其输入到下一个运算符中?对于我的示例,我只是调用了相同的运算符,但实际上它将调用不同的运算符。我只是不知道如何正确编码。我只是将代码添加到 dag 中吗?操作员是否需要添加代码?还是应该有别的东西?
示例达格:
import logging
import os
from airflow import DAG
from airflow.utils.dates import days_ago
from custom_operators.hello_world import HelloWorldOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
}
dag = DAG("hello_world_test",
description='Testing out a operator',
start_date=days_ago(1),
schedule_interval=None,
catchup=False,
default_args=default_args)
get_token = HelloWorldOperator(
task_id='hello_world_task_1',
name='My input to generate a token',
dag=dag
)
token = "My token" # Want this to be the return value from get_token
run_this = HelloWorldOperator(
task_id='hello_world_task_2',
name=token,
dag=dag
)
logging.info("Start")
get_token >> run_this
logging.info("End")
Hello World 运营商:
from airflow.models.baseoperator import BaseOperator
class HelloWorldOperator(BaseOperator):
def __init__(
self,
some_input: str,
**kwargs) -> None:
super().__init__(**kwargs)
self.some_input = some_input
def execute(self, context):
# Bunch of business logic
token = "MyGeneratedToken"
return token
解决方案
这是一个好的开始:)。
从另一个任务中检索令牌的正确方法是使用 jinja 模板
run_this = RetrieveToken(
task_id='hello_world_task_2',
retrieved_token="{{ ti.xcom_pull(task_ids=[\'hello_world_task_1\']) }}'",
dag=dag
)
但是,您必须记住在 RetrieveToken 中添加retrieved_token
到template_fields
数组: https ://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html#templating
您还可以xcom_pull
在“检索”运算符中显式调用方法,并将“来源”任务 ID 传递给运算符以从正确的任务中检索它。
推荐阅读
- ios - 我可以使用谷歌地图中的位置来旋转汽车吗?
- c++ - 获取基类的更简单方法
- python - Python:TypeError:'float'对象不可迭代
- python - 在 Python Pandas 中,如何搜索列元素是否包含前 2 位数字
- django - DRF IntegrityError:NOT NULL 约束失败:user_id
- sql - 如何在 Using 语句中关闭 sqldatareader?
- asp.net - AJAX UpdateProgress 不显示
- python - django python中的ORM'like'查询
- google-apps-script - 多个收件人 SendGrid Google Apps 脚本
- c# - GetHostEntry 返回本地计算机名而不是 DNS 名称