airflow - 如何在非气流运算符 python 函数中访问 Xcom 值
问题描述
我有一个存储的 XCom 值,我想将它传递给另一个不使用 PythonOperator 调用的 python 函数。
def sql_file_template():
<some code which uses xcom variable>
def call_stored_proc(**kwargs):
#project = kwargs['row_id']
print("INSIDE CALL STORE PROC ------------")
query = """CALL `{0}.dataset_name.store_proc`(
'{1}' # source table
, ['{2}'] # row_ids
, '{3}' # pivot_col_name
, '{4}' # pivot_col_value
, 100 # max_columns
, 'MAX' # aggregation
);"""
query = query.format(kwargs['project'],kwargs['source_tbl'] ,kwargs['row_id'],kwargs['pivot_col'],kwargs['pivot_val'])
job = client.query(query, location="US")
for result in job.result():
task_instance = kwargs['task_instance']
task_instance.xcom_push(key='query_string', value=result)
print result
return result
bq_cmd = PythonOperator (
task_id= 'task1'
provide_context= True,
python_callable= call_stored_proc,
op_kwargs= {'project' : project,
'source_tbl' : source_tbl,
'row_id' : row_id,
'pivot_col' : pivot_col,
'pivot_val' : pivot_val
},
dag= dag
)
dummy_operator >> bq_cmd
sql_file_template()
存储过程的输出是一个使用 xcom 捕获的字符串。
现在我想在不使用 PythonOperator 的情况下将此值传递给一些 python 函数sql_file_template。
根据 Airflow 文档,只能在任务之间访问 xcom。
有人可以帮忙吗?
解决方案
如果您有权访问要查询的 Airflow 安装(配置、数据库访问和代码),则可以使用 Airflow 的airflow.models.XCom:get_one
类方法:
from datetime import datetime
from airflow.models import XCom
execution_date = datetime(2020, 8, 28)
xcom_value = XCom.get_one(execution_date=execution_date,
task_id="the_task_id",
dag_id="the_dag_id")
推荐阅读
- c# - 如何防止“拉绳”被拆除?
- python - 是否可以使用 lua 或 python 访问 Windows 应用程序 gui?
- jobs - 从后台杀死工作
- django - 为什么不鼓励使用 django 在生产中提供静态文件?
- awk - awk 不能同时打印一个数组和一个位置参数
- google-cloud-functions - 通过 Google Cloud Functions 发送的电子邮件触发语法错误
- android - @Parcelize 未解决,androidExtensions 实验模式设置为 true
- java - 删除 Firebase 中的一行后如何在 Recycleview adaper 中刷新数据
- vue.js - Vuex 中的 Nuxt.generate()
- r - 在 bookdown 中自定义人物名称(就像章节名称一样)