python - 气流:将参数从 python 函数传递给 MySQL Operator
问题描述
我想要做什么:
- 比较 MySQL 和 BigQuery 中缺少的 ID
- 从 MySQL 中获取所有数据,其中 id 为缺少的 ID
table = 'orders'
def get_missing_ids():
aws_id = get_aws_id(table)
bq_id = get_bq_id(table)
missing_id = [np.setdiff1d(aws_id,bq_id)]
missing_ids = ', '.join(map(str,missing_id))
return missing_ids
missing_ids = get_missing_ids()
get_missing_data = MysqlToGCS(
task_id = 'get_orders_from_aws',
sql = """select *
from orders
where id in ({{params.missing_ids}})""",
params = {'missing_ids':missing_ids},
bucket = 'airflow_bucket',
filename = 'data/orders/db-orders{{ds}}{}',
mysql_conn_id = 'aws_readreplica',
approx_max_file_size_bytes = 100000000,
google_cloud_storage_conn_id = 'google_cloud_storage_default',
dag=dag)
def print_done():
print("done boiiiii")
time.sleep(60)
task = PythonOperator(
task_id='done',
python_callable=print_done,
dag=dag)
task.set_upstream(get_missing_data)
我阅读了有关 Xcom 的信息,但我不明白如何在这里实现它。
解决方案
最近,我正在研究气流主题并处理不同的数据库。所以我想我应该能够分享一些经验。
气流有用的概念: DAG/任务:您可以在气流管理 web->dag 页面中查看和跟踪。
变量:在气流系统级别的不同 dag 中设置和获取全局参数 Xcome:在某个 dag 级别的不同任务中设置和获取参数。Python Operator:可以是任务实例。DB 操作员/模型:可以是 Python 函数中的任务实例或对象。
- 在我的情况下,我只使用 python 运算符,并且在 python 运算符相关函数内部使用了与 db 相关的运算符。
3.在你的情况下,你可以用下面的伪代码来做:
from airflow import DAG
from airflow.contrib.operators.mysql_to_gcs import MySqlToGoogleCloudStorageOperator as mysqltogcs
from datetime import timedelta
table = 'orders'
===============
def get_missing_ids(ds, **kwargs):
ti = kwargs['ti']
aws_id = get_aws_id(table)
bq_id = get_bq_id(table)
missing_id = [np.setdiff1d(aws_id,bq_id)]
missing_ids = ', '.join(map(str,missing_id))
ti.xcom_push(key='missing_ids', value=missing_ids)
================
def get_orders_from_aws(ds, **kwargs):
missing_ids = ti.xcom_pull(key='missing_ids', task_ids='get_missing_ids')
sql = f"select * from orders where id in ({missing_ids})"
MG = mysqltogcs(sql=sql,
bucket = 'airflow_bucket',
filename = 'data/orders/db-orders{{ds}}{}',
mysql_conn_id = 'aws_readreplica',
approx_max_file_size_bytes = 100000000,
google_cloud_storage_conn_id = 'google_cloud_storage_default'
)
missing_data = MG.execute()
def print_done():
print("done boiiiii")
time.sleep(60)
===============
with DAG(dag_id="your_name", schedule_interval='timedelta(minute=5)') as dag:
task_1 = PythonOperator( task_id ="get_missing_ids",
python_callable=get_missing_ids,
provide_context=True)
task_2 = PythonOperator( task_id = 'get_orders_from_aws',
python_callable=get_orders_from_aws,
provide_context=True)
task_3 = PythonOperator( task_id='done',
python_callable=print_done)
task_1 >> task_3 >>task_3
推荐阅读
- php - Laravel,MYSQL 两个聚合表上的完全连接解决方法
- wordpress - 仅在主页上显示来自特定类别的最近帖子
- javascript - React Native:将 useState() 数据传递到不相关的屏幕
- java - 闰年Java程序
- node.js - 构建环回 3 项目
- wordpress - ERRORED_DOCUMENT_REQUEST
- keras - ValueError:形状 (None, 6) 和 (None, 5) 不兼容
- c# - 在全屏 UWP 应用程序前启动外部进程
- python - Python Docker容器,找不到模块
- kotlin - 协程在调度程序上运行,即使它的线程被关闭