json - 如何通过多处理将许多文档(1 亿)插入本地 mongodb?
问题描述
我成功地将许多 JSON 文件(仅选择的键)插入到本地 MongoDB。但是,当一个集合有超过 1 亿行需要插入时,我的代码似乎很慢。我希望多处理能帮助加快这个过程,但我想不出没有任何冲突的正确方法。这是我没有多处理的代码:
import json
import os
from pymongo import MongoClient
client = MongoClient('localhost', 27017)
db = client[db_name]
# get file list
def log_list(log_folder):
log_file = list()
for entry in os.listdir(log_folder):
if os.path.isfile(os.path.join(log_folder, entry)):
log_path = os.path.join(log_folder, entry)
log_file.append(log_path)
return log_file
def func():
collection = db[collection_name]
print('loading folder_name')
root = folder_path
nfile = 0
nrow = 0
# insert data
files = log_list(root)
files.sort()
for file in files:
with open(file, 'r') as f:
nfile += 1
table = [json.loads(line) for line in f]
for row in table:
nrow += 1
entry = {'timestamp': row['@timestamp'], 'user_id': row['user']['id'], 'action': row['@type']}
collection.insert_one(entry).inserted_id
client.close()
print(nfile, 'file(s) processed.', nrow, 'row(s) loaded.')
解决方案
我们在我们的项目中这样做,用户为某些任务上传大量文件,我们使用 Celery 使用分布式任务队列来处理它。
由于这是一个类似的异步任务,'Celery' 在这里可以做得很好,它被设计为拾取任务,然后在单独的进程中执行。
- 创建任务
- 设置代理(如 redis)
- 在另一个终端或后台运行 celery
- 发送任务(参见 task_name.apply_async() 或 task_name.delay() )
推荐阅读
- c# - 使用泛型添加 HTTP 客户端 - ASP.NET Core
- scripting - 用 tcl 处理大文件
- sql-server - 可选列上的 SQL Server 索引
- vbscript - 使用 fso.GetAbsolutePathName(".") 检查后如何设置正确的语法 if (fso.FileExists(""))
- javascript - Universal-Link / Deeplink 在 Safari 中打开,而不是在 iPhone 上安装 YouTube-App
- python - 为什么我的 sklearn 混淆矩阵和 plot_confusion_matrix 的值不相等?
- python - python pygame鼠标事件不起作用并且不会引发错误
- angular - ng 测试 - Chrome 没有在 60000 毫秒内捕获,正在杀死
- docker - 在docker中运行命令后我无法输入,但如果我手动执行它就可以
- laravel - 从 Laravel Nova Action 发送通知