首页 > 解决方案 > 如何动态创建具有不同参数的运算符

问题描述

我有以下代码:

def chunck_import(**kwargs):
    ...
    logging.info('Number of pages required is: {0}'.format(num_pages))
    for i in range(1, num_pages + 1):
        ...
        parameter_where = 'where orders_id between {0} and {1}'.format(start,end)
        logging.info(parameter_where)

chunck_import_op = PythonOperator(
    task_id='chunck_import',
    provide_context=True,
    python_callable=chunck_import,
    dag=dag)


start_task_op >> ... >>  chunck_import_op

此运算符创建多个WHERE语句:

INFO - From 7557920 to 7793493
INFO - Number of pages required is: 4
where orders_id between 7607920 and 7657920
where orders_id between 7657921 and 7707920
where orders_id between 7707921 and 7757920
where orders_id between 7757921 and 7793493

现在,我有一个MySqlToGoogleCloudStorageOperator如下:

import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    ...
    sql = 'select * from orders {{ params.where_cluster }}',
    params={'where_cluster': parameter_where},
    dag=dag) 

chunck_import_op知道我需要调用的次数-MySqlToGoogleCloudStorageOperator 它还num_pages创建了我需要作为参数传递的字符串 -parameter_where

我的问题是如何动态创建并传递MySqlToGoogleCloudStorageOperator给它。num_pagesparameter_where

标签: airflow

解决方案


我将子类MySqlToGoogleCloudStorageOperator化以自定义查询并覆盖执行步骤以根据传递给操作员的页面大小参数生成分页查询。这是一些额外的工作,但建议在此处的其他选项中使用。

但是,您不能让PythonOperator或任何操作员修改 DAG(并使其被识别和安排)。它最多可以做的事情之一是:

  1. 构建 where 子句后,MySqlToGoogleCloudStorageOperator用该子句构建 a 并在PythonOperator. 这将起作用,您将在's 的日志中看到MySqlToGoogleCloudStorageOperator右侧的日志消息。PythonOperator
  2. 使用PythonOperatororTriggerDagRunOperator触发另一个 DAG,只需MySqlToGoogleCloudStorageOperator传入子句作为参数,或者首先将其推送到 XCOM 以用于该 DAG。其他 DAG 可能应该将 Schedule 设置为@None. 这将使跟踪日志变得更加困难,但它可以并行运行 DAG。

如果它是我的 DAG,我认为我的方法(如果不是子类化)将始终在 1 到 X 页中处理。让我们建议您的 DAG 应该处理最多 X 页的结果,例如 X 为 10。然后定义 10 个分支chunck_import_op的父级。您将不需要chunck_import_op或可调用的。

  • 每个分支都以 a 开头,它使用不同的参数(0 到 9)ShortCircuitOperator调用相同的可调用对象。offset此可调用将检查是否offset * page_size大于end,如果是则返回False,跳过其下游运算符。否则,它将向 xcom 推送一个基于偏移量范围的有效查询并返回True以运行它们。
  • 每个分支都以 a 继续,MySqlToGoogleCloudStorageOperator其中将查询设置为{{ ti.xcom_pull('<ShortCircuitOperator_N>') }}字符串是前一个的名称ShortCircuitOperator
  • 如果您在MySqlToGoogleCloudStorageOperators 之后需要其他运算符,首先将 a 添加DummyOperator为所有这些MySqlToGoogleCloudStorageOperators的子代trigger_rule ALL_DONE,然后将 s 添加为该运算符的子代。

这样,您可以在必要时运行 1 到 10 个分页查询。虽然它们可能并行运行,但我不认为这是一个潜在的问题,只是考虑一下。


推荐阅读