airflow - 开发 DAG,将大量 csv 文件并行上传到数据库
问题描述
我想开发 DAG,它将大量的 csv 文件并行上传到 Clickhouse,所以如果有任何建议怎么做?
这是最好的方法,还是有另一种方法?
现在我有一个 DAG 原型,它根据行数从 csv 创建块并动态生成 DAG 任务。(因此每次生成取决于 csv 文件大小的不同任务计数)
def generate_chunks(*args, **kwargs):
with open(kwargs['file_path'], 'r') as f:
reader = csv.reader(f)
# TODO: chunk size from Variables
chunks = list(gen_chunks(reader, chunk_size=10000))
chunks_count = len(chunks)
# TODO: variable name from main_dag_id
os.system(f'airflow variables --set WebsiteStatisticsChunks {chunks_count}')
for i in range(chunks_count):
reset_tasks_status('chunks_' + str(i), str(kwargs['execution_date']))
return chunks
WebsiteStatisticsChunks = Variable.get("WebsiteStatisticsChunks")
logging.info("The current WebsiteStatisticsChunks value is " + str(WebsiteStatisticsChunks))
for index in range(int(WebsiteStatisticsChunks)):
dynamicTask = PythonOperator(
task_id='chunks_' + str(index),
dag=dag,
provide_context=True,
python_callable=do_some_work,
op_args=['chunks', index],
op_kwargs={
'chunks': "{{ task_instance.xcom_pull(task_ids='create_chunks') }}",
'file_path': "{{ task_instance.xcom_pull(task_ids='extract_cards_plugin') }}",
'clickhouse_conn_id': 'clickhouse_test',
'database': 'dev_test',
'table': 'pg_raw_cms_cards',
},
)
create_chunks.set_downstream(dynamicTask)
dynamicTask.set_downstream(ending_task)
解决方案
推荐阅读
- android - lambda 在 onclick 监听器中的使用
- image - Flutter 图像加载和缓存
- salesforce - 如何根据新潜在客户和现有潜在客户在页面布局中显示警告消息
- ionic4 - ListDir 仅包含 pdf 文件
- sql-server - 基于背景颜色 ssrs 的总和
- clojure - clojure ogre 中的文本和字符串搜索以查询 Janusgraph - 找不到任何函数或方法
- angular - 如何将js转换为内容浏览器的Ts?
- batch-file - 如何生成 00 和 FF 之间但没有相等字符组合的随机值?
- azure - 是否可以通过 Az Cmdlet 以编程方式路由 Slot 流量?
- python - 消除 CSV 文件中不需要的换行符