首页 > 解决方案 > 使用 MySqlToGoogleCloudStorageOperator 对记录进行分页

问题描述

我的工作流程是:

  1. 我正在从变量中获取我们当前拥有的最大订单 IDLAST_IMPORTED_ORDER_ID
  2. 我得到了order_id最大值MySQL数据库中得到最大值
  3. 将订单导入到使用中的LAST_IMPORTED_ORDER_ID 值之间xcomMySQLMySqlToGoogleCloudStorageOperator

到目前为止一切都很好,而且效果很好。

然而问题是当值之间的差距太大时。可以是500K订单。一次导入这么多记录是不可能的。

MySqlToGoogleCloudStorageOperator有能力将保存在存储中的文件分解成块approx_max_file_size_bytes它没有能力对查询本身进行分块。

基本上我想做的是对查询使用分页之类的东西。如果
xcom_order_id - LAST_IMPORTED_ORDER_ID > 50K然后打破最多 50K 行的查询,这意味着我需要动态创建运算符。

这就是我试图做的:

LAST_IMPORTED_ORDER_ID = Variable.get("last_order_id_imported")

start_task_op = DummyOperator(task_id='start_task', dag=dag)

def chunck_import(**kwargs):
    ti = kwargs['ti']
    xcom = int(ti.xcom_pull(task_ids='get_max_order_id_2_months_ago'))
    current = int(LAST_IMPORTED_ORDER_ID)
    if xcom - current < 50000:
        num_pages = 1
    else:
        num_pages = int((xcom / current) + 1)
    logging.info(xcom)
    logging.info(current)
    for i in range(1, num_pages + 1):  #for 1 page its range(1,2)
        start = LAST_IMPORTED_ORDER_ID * i
        end = start + 50000
        if end > xcom:
            end = xcom
        import_orders_op = MySqlToGoogleCloudStorageOperator(
            task_id='import_orders_and_upload_to_storage_orders-{}'.format(i),
            mysql_conn_id='mysqlcon',
            google_cloud_storage_conn_id='googlecon',
            provide_context=True,
            approx_max_file_size_bytes=100000000,
            sql='select * from e.orders where orders_id between {{ params.start }} and {{ params.end }}',
            params={'start': start, 'end': end},
            bucket=GCS_BUCKET_ID,
            filename=file_name_orders,
            dag=dag)


chunck_import_op = PythonOperator(
    task_id='chunck_import',
    provide_context=True,
    python_callable=chunck_import,
    dag=dag)

start_task_op >> get_max_order_id_2_months_ago >> chunck_import_op

这没有错误,它运行成功,但它什么也不做。

中的值XCOM是正确的。但chunck_import_op 不做任何事情。我也没有MySqlToGoogleCloudStorageOperator在我的 UI 中看到动态创建的:

在此处输入图像描述

另请注意,print num_pages我也没有在日志中看到该值。

我怎样才能解决这个问题?

标签: airflow

解决方案


对您来说不幸的是,操作员无法修改其所在的 DAG。由于您只能在操作员执行中拉取 xcom,因此我建议您不要将操作员添加到 DAG,而是在循环结束时设置操作员,在循环内,调用:

import_orders_op.pre_execute(**kwargs)
import_orders_op.execute(**kwargs)

这有点笨拙,因为所有日志输出都将在chunck_import您可能希望为自己在逻辑上重命名的任务中(import_in_chunks?),但它应该可以工作,并且您的 DAG 不会更改每次运行的确切任务数。

或者,我认为这甚至更复杂,假设最大数量的块,为每个基于块的范围设置一对ShortCircuitOperator和。MySqlToGoogleCloudStorageOperator应该检查块的ShortCircuitOperator起始范围是否有效,如果有效则运行 sql 2 gcs op,否则短路。

更好的方法是将 MySqlToGoogleCloudStorageOperator 子类化为 PagedMySqlToGCSOperator,覆盖execute,_query_mysql_write_local_data_files. 不过工作量更大。


推荐阅读