首页 > 解决方案 > 气流:将参数从 python 函数传递给 MySQL Operator

问题描述

我想要做什么:
- 比较 MySQL 和 BigQuery 中缺少的 ID
- 从 MySQL 中获取所有数据,其中 id 为缺少的 ID

table = 'orders'

def get_missing_ids():
    aws_id = get_aws_id(table)
    bq_id = get_bq_id(table)
    missing_id = [np.setdiff1d(aws_id,bq_id)]
    missing_ids = ', '.join(map(str,missing_id))
    return missing_ids

missing_ids = get_missing_ids()

get_missing_data = MysqlToGCS(
    task_id = 'get_orders_from_aws',
    sql = """select *
        from orders 
        where id in ({{params.missing_ids}})""",
    params = {'missing_ids':missing_ids},
    bucket = 'airflow_bucket',
    filename = 'data/orders/db-orders{{ds}}{}',
    mysql_conn_id = 'aws_readreplica',
    approx_max_file_size_bytes = 100000000,
    google_cloud_storage_conn_id = 'google_cloud_storage_default',
    dag=dag)

def print_done():
    print("done boiiiii")
    time.sleep(60)

task = PythonOperator(
        task_id='done',
        python_callable=print_done,
        dag=dag)

task.set_upstream(get_missing_data)

我阅读了有关 Xcom 的信息,但我不明白如何在这里实现它。

标签: pythonmysqlgoogle-cloud-storageairflowgoogle-cloud-composer

解决方案


最近,我正在研究气流主题并处理不同的数据库。所以我想我应该能够分享一些经验。

  1. 气流有用的概念: DAG/任务:您可以在气流管理 web->dag 页面中查看和跟踪。

    变量:在气流系统级别的不同 dag 中设置和获取全局参数 Xcome:某个 dag 级别的不同任务中设置和获取参数。Python Operator:可以是任务实例。DB 操作员/模型:可以是 Python 函数中的任务实例或对象。

  2. 在我的情况下,我只使用 python 运算符,并且在 python 运算符相关函数内部使用了与 db 相关的运算符。

3.在你的情况下,你可以用下面的伪代码来做:

from airflow import DAG

from airflow.contrib.operators.mysql_to_gcs import MySqlToGoogleCloudStorageOperator  as mysqltogcs

from datetime import timedelta

table = 'orders'

===============

def get_missing_ids(ds, **kwargs):
    ti = kwargs['ti']
    aws_id = get_aws_id(table)
    bq_id = get_bq_id(table)
    missing_id = [np.setdiff1d(aws_id,bq_id)]
    missing_ids = ', '.join(map(str,missing_id))
    ti.xcom_push(key='missing_ids', value=missing_ids)

================

def get_orders_from_aws(ds, **kwargs):
    missing_ids = ti.xcom_pull(key='missing_ids', task_ids='get_missing_ids')
    sql = f"select *  from orders where id in ({missing_ids})"
    MG = mysqltogcs(sql=sql,
                    bucket = 'airflow_bucket',
                    filename = 'data/orders/db-orders{{ds}}{}',
                    mysql_conn_id = 'aws_readreplica',
                    approx_max_file_size_bytes = 100000000,
                    google_cloud_storage_conn_id = 'google_cloud_storage_default'
                   )
    missing_data = MG.execute()




def print_done():
    print("done boiiiii")
    time.sleep(60)

===============

with DAG(dag_id="your_name", schedule_interval='timedelta(minute=5)') as dag:

        task_1 = PythonOperator( task_id ="get_missing_ids",
                        python_callable=get_missing_ids,
                        provide_context=True)

        task_2 = PythonOperator( task_id = 'get_orders_from_aws',
                        python_callable=get_orders_from_aws,
                        provide_context=True)

        task_3 = PythonOperator( task_id='done',
                        python_callable=print_done)

        task_1 >> task_3 >>task_3

推荐阅读