airflow - 将 OracleOperator 的输出发送到 Airflow 中的另一个任务
问题描述
我需要在另一个任务中使用 oracleOperator 的输出以进一步执行。我遇到的麻烦是,当我将数据拉到另一个任务中并打印它时,它给出的结果为无。没有抛出错误,但没有传递数据。此外,任务 UI 中的xcom选项卡显示键和值的空白。
我的代码如下:
from airflow import DAG
from airflow.operators.oracle_operator import OracleOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
args = {
'owner': 'xyz',
'start_date': days_ago(2),
}
dag = DAG('example_xcom', schedule_interval="@once", default_args=args, tags=['example'])
def puller(**kwargs):
ti = kwargs['ti']
# get value_1
pulled_value_1 = ti.xcom_pull(key=None, task_ids='push')
print("VALUE IN PULLER ")
print(pulled_value_1)
pull = PythonOperator(
task_id='pullee',
dag=dag,
python_callable=puller,
provide_context=True,
)
push = OracleOperator(
task_id='data',
sql='SELECT * FROM CUSTOMERS',
oracle_conn_id='1',
provide_context=True,
dag=dag,
)
push>>pull
解决方案
您可以使用以下代码。基本上使用PythonOperator
with OracleHook
。
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.oracle_hook import OracleHook
from airflow.utils.dates import days_ago
args = {
'owner': 'xyz',
'start_date': days_ago(2),
}
dag = DAG('example_xcom', schedule_interval="@once", default_args=args, tags=['example'])
def puller(**kwargs):
ti = kwargs['ti']
# get value_1
pulled_value_1 = ti.xcom_pull(task_ids='data')
print("VALUE IN PULLER : ", pulled_value_1)
def get_data_from_oracle(**kwargs):
oracle_hook = OracleHook(oracle_conn_id=kwargs['oracle_conn_id'])
return oracle_hook.get_records(sql=kwargs['sql'])
push = PythonOperator(
task_id='data',
op_kwargs={'oracle_conn_id': 'oracle_conn_id', 'sql': 'SELECT * FROM CUSTOMERS'}
provide_context=True,
python_callable=get_data_from_oracle,
dag=dag,
)
pull = PythonOperator(
task_id='pullee',
dag=dag,
python_callable=puller,
provide_context=True,
)
push >> pull
推荐阅读
- dart - 测试使用插件和平台通道的 Flutter 代码
- c# - 异步删除显示数据的问题
- .net-core - EF Core 引用同一张表,单个集合
- reactjs - 在无状态组件中获取 ref
- javascript - 与 ng-options 中的硬编码 json 相比,解析我的动态 json 时出错
- google-cloud-platform - 如何在没有 ?authuser=1 的情况下获得经过身份验证的 GCS URL
- swift - 运行应用程序时,表格视图单元上未显示 Swift 按钮
- amazon-web-services - AWS Classic Load Balancer 随机不接收请求
- python - 用于单词计数、平均单词长度、单词频率和以字母开头的单词频率的 Python 程序
- android - (Mockito + Kotlin) Android 测试中的 eq() 和 any() 返回 null