首页 > 解决方案 > 如何在非气流运算符 python 函数中访问 Xcom 值

问题描述

我有一个存储的 XCom 值,我想将它传递给另一个不使用 PythonOperator 调用的 python 函数。


def sql_file_template():
    <some code which uses xcom variable>

def call_stored_proc(**kwargs):
        
        #project = kwargs['row_id']
        print("INSIDE CALL STORE PROC ------------")   
        query = """CALL `{0}.dataset_name.store_proc`(
                          '{1}' # source table
                        , ['{2}'] # row_ids
                        , '{3}' # pivot_col_name   
                        , '{4}' # pivot_col_value
                        ,  100 # max_columns
                        , 'MAX' # aggregation
                );"""
        query = query.format(kwargs['project'],kwargs['source_tbl'] ,kwargs['row_id'],kwargs['pivot_col'],kwargs['pivot_val'])
        job = client.query(query, location="US")
        for result in job.result():
            task_instance = kwargs['task_instance']
            task_instance.xcom_push(key='query_string', value=result) 
                print result
                return result



bq_cmd = PythonOperator (
    task_id=                    'task1'
    provide_context=            True,
    python_callable=            call_stored_proc,
    op_kwargs=                  {'project'        : project,
                                 'source_tbl'     : source_tbl,
                                 'row_id'         : row_id,
                                 'pivot_col'      : pivot_col,
                                 'pivot_val'      : pivot_val
                                },
    dag=                        dag
)

dummy_operator >> bq_cmd
sql_file_template()

存储过程的输出是一个使用 xcom 捕获的字符串。

现在我想在不使用 PythonOperator 的情况下将此值传递给一些 python 函数sql_file_template

根据 Airflow 文档,只能在任务之间访问 xcom。

有人可以帮忙吗?

标签: airflow

解决方案


如果您有权访问要查询的 Airflow 安装(配置、数据库访问和代码),则可以使用 Airflow 的airflow.models.XCom:get_one类方法:

from datetime import datetime

from airflow.models import XCom


execution_date = datetime(2020, 8, 28)
xcom_value = XCom.get_one(execution_date=execution_date,
                          task_id="the_task_id",
                          dag_id="the_dag_id")            

推荐阅读