python - 气流:如何将变量 obtenida de mi DB 传递给 SimpleHttpOperator 函数
问题描述
我从气流开始。我需要从我的 PostgreSQL 数据库中获取访问令牌,然后我必须使用该访问令牌通过 SimpleHttpOperator 函数查询 API。
这是我的代码:
from airflow.models import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.python import PythonOperator
from datetime import datetime
import json
default_args = {
'start_date':datetime(2021, 1, 1)
}
def _get_access_token():
request = "SELECT access_token FROM access_token"
postgres_hook = PostgresHook(postgres_conn_id="postgres_default")
connection = postgres_hook.get_conn()
cursor = connection.cursor()
cursor.execute(request)
jobs = cursor.fetchall()
access_token = ([i[0] for i in jobs])
return access_token
with DAG('get_broadworks_subscribers', schedule_interval='@once',
default_args = default_args,
catchup=False) as dag:
# Tasks
get_access_token = PythonOperator(
task_id='get_access_token',
python_callable=_get_access_token
)
get_subscribers_list = SimpleHttpOperator(
task_id = 'get_subscribers_list',
http_conn_id = 'webex',
endpoint = 'v1/broadworks/subscribers/',
method = 'GET',
authorization = "Bearer" + " " + access_token[0],
headers = {
"Authorization": "authorization"
},
response_filter = lambda response: json.loads(response.text),
log_response = True
)
get_access_token >> get_subscribers_list
我收到以下错误:
authorization = "Bearer" + " " + access_token[0],
NameError: name 'access_token' is not defined
我希望你能帮我一把,非常感谢你提前。
解决方案
您可能期望 python 函数将返回值,以便稍后在您的代码中使用。这不是气流的工作方式。任务之间不共享数据任务可以通过Xcom共享元数据。
PythonOperator
返回值被推送到 xcom(元存储中的表)。然后下游任务可以读取该值并在该字段被模板化时使用它。中也没有authorization
参数SimpleHttpOperator
。
所以你的代码可以是这样的:
get_subscribers_list = SimpleHttpOperator(
task_id = 'get_subscribers_list',
http_conn_id = 'webex',
endpoint = 'v1/broadworks/subscribers/',
method = 'GET',
headers = {
"Authorization": """Bearer {{ task_instance.xcom_pull(task_ids="get_access_token") }} """
},
response_filter = lambda response: json.loads(response.text),
log_response = True
)
由于headers
是模板化的,您可以从上游任务中提取 xcom 值。
注意:我不建议传递这样的令牌。您可能需要考虑将其安全地存储在 Airflow Variable 中。它还可以省去您在单独的任务中从数据库中查询它的麻烦。如果您将它存储在变量中,您需要更改的是:
headers = {
"Authorization": """Bearer {{ var.value.get('my_var_name') }} """
}
请注意,如果键包含任何值,Airflow 会自动屏蔽值,'password', 'secret', 'passwd', 'authorization', 'api_key', 'apikey', 'access_token'
但是如果您选择使用不包含任何键的键,如果您将字符串添加到sensitive_var_conn_names
airflow.cfg 中,则仍可以隐藏它以获取有关此内容的更多信息,请参阅文档。_
推荐阅读
- angular - 服务类中的 RX/Js 可观察问题
- azure-function-app - 在 VNet 中自动缩放 Azure 函数
- google-apps-script - 复制+粘贴显示的值 Google 表格
- javascript - NestJS/Fastify Cookie 未处理的承诺拒绝警告
- oauth-2.0 - Keycloak - 合并具有相同电子邮件的用户
- java - OpenCV + OpenGL:复制 gl 纹理,使用 OpenCV 修改并渲染
- java - 带有构造函数的基类的子类中的强制构造函数
- c++ - 如何从构造函数副本 T(const T&) 调用对象 T?
- azure-devops - TF400813: 用户 '' 无权访问此资源
- scala - 在scala中,如何对内联表达式进行模式匹配?