airflow - 如何动态创建具有不同参数的运算符
问题描述
我有以下代码:
def chunck_import(**kwargs):
...
logging.info('Number of pages required is: {0}'.format(num_pages))
for i in range(1, num_pages + 1):
...
parameter_where = 'where orders_id between {0} and {1}'.format(start,end)
logging.info(parameter_where)
chunck_import_op = PythonOperator(
task_id='chunck_import',
provide_context=True,
python_callable=chunck_import,
dag=dag)
start_task_op >> ... >> chunck_import_op
此运算符创建多个WHERE
语句:
INFO - From 7557920 to 7793493
INFO - Number of pages required is: 4
where orders_id between 7607920 and 7657920
where orders_id between 7657921 and 7707920
where orders_id between 7707921 and 7757920
where orders_id between 7757921 and 7793493
现在,我有一个MySqlToGoogleCloudStorageOperator
如下:
import_orders_op = MySqlToGoogleCloudStorageOperator(
task_id='import_orders',
...
sql = 'select * from orders {{ params.where_cluster }}',
params={'where_cluster': parameter_where},
dag=dag)
chunck_import_op
知道我需要调用的次数-MySqlToGoogleCloudStorageOperator
它还num_pages
创建了我需要作为参数传递的字符串 -parameter_where
我的问题是如何动态创建并传递MySqlToGoogleCloudStorageOperator
给它。num_pages
parameter_where
解决方案
我将子类MySqlToGoogleCloudStorageOperator
化以自定义查询并覆盖执行步骤以根据传递给操作员的页面大小参数生成分页查询。这是一些额外的工作,但建议在此处的其他选项中使用。
但是,您不能让PythonOperator
或任何操作员修改 DAG(并使其被识别和安排)。它最多可以做的事情之一是:
- 构建 where 子句后,
MySqlToGoogleCloudStorageOperator
用该子句构建 a 并在PythonOperator
. 这将起作用,您将在's 的日志中看到MySqlToGoogleCloudStorageOperator
右侧的日志消息。PythonOperator
- 使用
PythonOperator
orTriggerDagRunOperator
触发另一个 DAG,只需MySqlToGoogleCloudStorageOperator
传入子句作为参数,或者首先将其推送到 XCOM 以用于该 DAG。其他 DAG 可能应该将 Schedule 设置为@None
. 这将使跟踪日志变得更加困难,但它可以并行运行 DAG。
如果它是我的 DAG,我认为我的方法(如果不是子类化)将始终在 1 到 X 页中处理。让我们建议您的 DAG 应该处理最多 X 页的结果,例如 X 为 10。然后定义 10 个分支chunck_import_op
的父级。您将不需要chunck_import_op
或可调用的。
- 每个分支都以 a 开头,它使用不同的参数(0 到 9)
ShortCircuitOperator
调用相同的可调用对象。offset
此可调用将检查是否offset * page_size
大于end
,如果是则返回False
,跳过其下游运算符。否则,它将向 xcom 推送一个基于偏移量范围的有效查询并返回True
以运行它们。 - 每个分支都以 a 继续,
MySqlToGoogleCloudStorageOperator
其中将查询设置为{{ ti.xcom_pull('<ShortCircuitOperator_N>') }}
字符串是前一个的名称ShortCircuitOperator
。 - 如果您在
MySqlToGoogleCloudStorageOperator
s 之后需要其他运算符,首先将 a 添加DummyOperator
为所有这些MySqlToGoogleCloudStorageOperator
s的子代trigger_rule
ALL_DONE
,然后将 s 添加为该运算符的子代。
这样,您可以在必要时运行 1 到 10 个分页查询。虽然它们可能并行运行,但我不认为这是一个潜在的问题,只是考虑一下。
推荐阅读
- c# - 绑定 URL (http//image) imagebrush datatemplate WPF
- excel - 如何找到 Excel 的正式文档?
- vue.js - 如果有 *any* 错误,Vue 将不会渲染任何内容
- c++ - 如何正确地将 cv::Mat 转换为具有完美匹配值的 torch::Tensor?
- django - 无法解决 Git 错误“拒绝合并不相关的历史”
- javascript - 如何根据文件名排序文件?
- jquery - 引导模式弹出框有问题
- javascript - 将对象存储到 Javascript/Nodejs 中的文件中
- algorithm - 给定数字 n,如何打印大小为 m 的所有子序列?
- docker - 码头网络?它是持久的吗?