首页 > 解决方案 > 将 OracleOperator 的输出发送到 Airflow 中的另一个任务

问题描述

我需要在另一个任务中使用 oracleOperator 的输出以进一步执行。我遇到的麻烦是,当我将数据拉到另一个任务中并打印它时,它给出的结果为无。没有抛出错误,但没有传递数据。此外,任务 UI 中的xcom选项卡显示键和值的空白。

我的代码如下:

from airflow import DAG
from airflow.operators.oracle_operator import OracleOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

args = {
    'owner': 'xyz',
    'start_date': days_ago(2),
}

dag = DAG('example_xcom', schedule_interval="@once", default_args=args, tags=['example'])


def puller(**kwargs):
    ti = kwargs['ti']
    # get value_1
    pulled_value_1 = ti.xcom_pull(key=None, task_ids='push')
    print("VALUE IN PULLER ")
    print(pulled_value_1)

pull = PythonOperator(
    task_id='pullee',
    dag=dag,
    python_callable=puller,
    provide_context=True,
)
push = OracleOperator(
    task_id='data',
    sql='SELECT * FROM CUSTOMERS', 
    oracle_conn_id='1',
    provide_context=True,
    dag=dag,
)


push>>pull

标签: airflow

解决方案


您可以使用以下代码。基本上使用PythonOperatorwith OracleHook

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.oracle_hook import OracleHook
from airflow.utils.dates import days_ago

args = {
    'owner': 'xyz',
    'start_date': days_ago(2),
}

dag = DAG('example_xcom', schedule_interval="@once", default_args=args, tags=['example'])


def puller(**kwargs):
    ti = kwargs['ti']
    # get value_1
    pulled_value_1 = ti.xcom_pull(task_ids='data')
    print("VALUE IN PULLER : ", pulled_value_1)


def get_data_from_oracle(**kwargs):
    oracle_hook = OracleHook(oracle_conn_id=kwargs['oracle_conn_id'])
    return oracle_hook.get_records(sql=kwargs['sql'])

push = PythonOperator(
    task_id='data',
    op_kwargs={'oracle_conn_id': 'oracle_conn_id', 'sql': 'SELECT * FROM CUSTOMERS'}
    provide_context=True,
    python_callable=get_data_from_oracle,
    dag=dag,
)

pull = PythonOperator(
    task_id='pullee',
    dag=dag,
    python_callable=puller,
    provide_context=True,
)


push >> pull


推荐阅读