airflow - 使用 MySqlToGoogleCloudStorageOperator 对记录进行分页
问题描述
我的工作流程是:
- 我正在从变量中获取我们当前拥有的最大订单 ID
LAST_IMPORTED_ORDER_ID
- 我得到了
order_id
最大值MySQL
数据库中得到最大值 - 将订单导入到使用中的
LAST_IMPORTED_ORDER_ID
值之间xcom
MySQL
MySqlToGoogleCloudStorageOperator
到目前为止一切都很好,而且效果很好。
然而问题是当值之间的差距太大时。可以是500K
订单。一次导入这么多记录是不可能的。
MySqlToGoogleCloudStorageOperator
有能力将保存在存储中的文件分解成块approx_max_file_size_bytes
它没有能力对查询本身进行分块。
基本上我想做的是对查询使用分页之类的东西。如果
xcom_order_id - LAST_IMPORTED_ORDER_ID > 50K
然后打破最多 50K 行的查询,这意味着我需要动态创建运算符。
这就是我试图做的:
LAST_IMPORTED_ORDER_ID = Variable.get("last_order_id_imported")
start_task_op = DummyOperator(task_id='start_task', dag=dag)
def chunck_import(**kwargs):
ti = kwargs['ti']
xcom = int(ti.xcom_pull(task_ids='get_max_order_id_2_months_ago'))
current = int(LAST_IMPORTED_ORDER_ID)
if xcom - current < 50000:
num_pages = 1
else:
num_pages = int((xcom / current) + 1)
logging.info(xcom)
logging.info(current)
for i in range(1, num_pages + 1): #for 1 page its range(1,2)
start = LAST_IMPORTED_ORDER_ID * i
end = start + 50000
if end > xcom:
end = xcom
import_orders_op = MySqlToGoogleCloudStorageOperator(
task_id='import_orders_and_upload_to_storage_orders-{}'.format(i),
mysql_conn_id='mysqlcon',
google_cloud_storage_conn_id='googlecon',
provide_context=True,
approx_max_file_size_bytes=100000000,
sql='select * from e.orders where orders_id between {{ params.start }} and {{ params.end }}',
params={'start': start, 'end': end},
bucket=GCS_BUCKET_ID,
filename=file_name_orders,
dag=dag)
chunck_import_op = PythonOperator(
task_id='chunck_import',
provide_context=True,
python_callable=chunck_import,
dag=dag)
start_task_op >> get_max_order_id_2_months_ago >> chunck_import_op
这没有错误,它运行成功,但它什么也不做。
中的值XCOM
是正确的。但chunck_import_op
不做任何事情。我也没有MySqlToGoogleCloudStorageOperator
在我的 UI 中看到动态创建的:
另请注意,print num_pages
我也没有在日志中看到该值。
我怎样才能解决这个问题?
解决方案
对您来说不幸的是,操作员无法修改其所在的 DAG。由于您只能在操作员执行中拉取 xcom,因此我建议您不要将操作员添加到 DAG,而是在循环结束时设置操作员,在循环内,调用:
import_orders_op.pre_execute(**kwargs)
import_orders_op.execute(**kwargs)
这有点笨拙,因为所有日志输出都将在chunck_import
您可能希望为自己在逻辑上重命名的任务中(import_in_chunks
?),但它应该可以工作,并且您的 DAG 不会更改每次运行的确切任务数。
或者,我认为这甚至更复杂,假设最大数量的块,为每个基于块的范围设置一对ShortCircuitOperator
和。MySqlToGoogleCloudStorageOperator
应该检查块的ShortCircuitOperator
起始范围是否有效,如果有效则运行 sql 2 gcs op,否则短路。
更好的方法是将 MySqlToGoogleCloudStorageOperator 子类化为 PagedMySqlToGCSOperator,覆盖execute
,_query_mysql
和_write_local_data_files
. 不过工作量更大。
推荐阅读
- javascript - Angular 如何在桌面上以一种方式显示页面,在响应模式下以另一种方式在移动设备上显示页面
- android - 为什么上下文菜单不适用于 android kotlin?
- shopify - Shopfiy Liquid 通过终端“跳过”更新
- javascript - 在一行中的元素内创建复选框
- javascript - 在 css 中导航侧边菜单汉堡图标时的设计问题
- javascript - 在 Node.js 中抓取时的多个值
- node.js - 错误:尝试连接 AWS RDS (Postgres) 时连接 ECONNREFUSED 127.0.0.1:5432
- apache-spark - json 文件到 pyspark 数据帧
- matplotlib - 将误差线添加到 seaborn 散点图和线图组合
- python - 在 LDAvis 中,红色条高于蓝色条