首页 > 解决方案 > 开发 DAG,将大量 csv 文件并行上传到数据库

问题描述

我想开发 DAG,它将大量的 csv 文件并行上传到 Clickhouse,所以如果有任何建议怎么做?

这是最好的方法,还是有另一种方法?

现在我有一个 DAG 原型,它根据行数从 csv 创建块并动态生成 DAG 任务。(因此每次生成取决于 csv 文件大小的不同任务计数)

def generate_chunks(*args, **kwargs):
    with open(kwargs['file_path'], 'r') as f:
        reader = csv.reader(f)

        # TODO: chunk size from Variables
        chunks = list(gen_chunks(reader, chunk_size=10000))
        chunks_count = len(chunks)

        # TODO: variable name from main_dag_id
        os.system(f'airflow variables --set WebsiteStatisticsChunks {chunks_count}')

        for i in range(chunks_count):
            reset_tasks_status('chunks_' + str(i), str(kwargs['execution_date']))

        return chunks
    WebsiteStatisticsChunks = Variable.get("WebsiteStatisticsChunks")
    logging.info("The current WebsiteStatisticsChunks value is " + str(WebsiteStatisticsChunks))

    for index in range(int(WebsiteStatisticsChunks)):
        dynamicTask = PythonOperator(
            task_id='chunks_' + str(index),
            dag=dag,
            provide_context=True,
            python_callable=do_some_work,
            op_args=['chunks', index],
            op_kwargs={

                'chunks': "{{ task_instance.xcom_pull(task_ids='create_chunks') }}",
                'file_path': "{{ task_instance.xcom_pull(task_ids='extract_cards_plugin') }}",
                'clickhouse_conn_id': 'clickhouse_test',
                'database': 'dev_test',
                'table': 'pg_raw_cms_cards',
            },
        )
        create_chunks.set_downstream(dynamicTask)
        dynamicTask.set_downstream(ending_task)

标签: airflow

解决方案


推荐阅读