首页 > 解决方案 > 气流:如何将变量 obtenida de mi DB 传递给 SimpleHttpOperator 函数

问题描述

我从气流开始。我需要从我的 PostgreSQL 数据库中获取访问令牌,然后我必须使用该访问令牌通过 SimpleHttpOperator 函数查询 API。

这是我的代码:

from airflow.models import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.python import PythonOperator

from datetime import datetime
import json


default_args = {
    'start_date':datetime(2021, 1, 1)
}

def _get_access_token():
    request = "SELECT access_token FROM access_token"
    postgres_hook = PostgresHook(postgres_conn_id="postgres_default")
    connection = postgres_hook.get_conn()
    cursor = connection.cursor()
    cursor.execute(request)
    jobs = cursor.fetchall()
    access_token = ([i[0] for i in jobs])

    return access_token


with DAG('get_broadworks_subscribers', schedule_interval='@once',
    default_args = default_args,
    catchup=False) as dag:

    # Tasks

    get_access_token = PythonOperator(
    task_id='get_access_token', 
    python_callable=_get_access_token
    )

    get_subscribers_list = SimpleHttpOperator(
        task_id = 'get_subscribers_list',
        http_conn_id = 'webex',
        endpoint = 'v1/broadworks/subscribers/',
        method = 'GET',
        authorization = "Bearer" + " " + access_token[0],
        headers = {
            "Authorization": "authorization"
        },
        response_filter = lambda response: json.loads(response.text),
        log_response = True
    )

get_access_token >> get_subscribers_list

我收到以下错误:

    authorization = "Bearer" + " " + access_token[0],
NameError: name 'access_token' is not defined

我希望你能帮我一把,非常感谢你提前。

标签: pythonairflow

解决方案


您可能期望 python 函数将返回值,以便稍后在您的代码中使用。这不是气流的工作方式。任务之间不共享数据任务可以通过Xcom共享元数据。

PythonOperator返回值被推送到 xcom(元存储中的表)。然后下游任务可以读取该值并在该字段被模板化时使用它。中也没有authorization参数SimpleHttpOperator

所以你的代码可以是这样的:

get_subscribers_list = SimpleHttpOperator(
    task_id = 'get_subscribers_list',
    http_conn_id = 'webex',
    endpoint = 'v1/broadworks/subscribers/',
    method = 'GET',
    headers = {
        "Authorization": """Bearer {{ task_instance.xcom_pull(task_ids="get_access_token") }} """
    },
    response_filter = lambda response: json.loads(response.text),
    log_response = True
)

由于headers模板化的,您可以从上游任务中提取 xcom 值。

注意:我不建议传递这样的令牌。您可能需要考虑将其安全地存储在 Airflow Variable 中。它还可以省去您在单独的任务中从数据库中查询它的麻烦。如果您将它存储在变量中,您需要更改的是:

    headers = {
        "Authorization": """Bearer {{ var.value.get('my_var_name') }} """
    }

请注意,如果键包含任何值,Airflow 会自动屏蔽值,'password', 'secret', 'passwd', 'authorization', 'api_key', 'apikey', 'access_token'但是如果您选择使用不包含任何键的键,如果您将字符串添加到sensitive_var_conn_namesairflow.cfg 中,则仍可以隐藏它以获取有关此内容的更多信息,请参阅文档。_


推荐阅读