首页 > 解决方案 > 如何通过多处理将许多文档(1 亿)插入本地 mongodb?

问题描述

我成功地将许多 JSON 文件(仅选择的键)插入到本地 MongoDB。但是,当一个集合有超过 1 亿行需要插入时,我的代码似乎很慢。我希望多处理能帮助加快这个过程,但我想不出没有任何冲突的正确方法。这是我没有多处理的代码:

import json
import os

from pymongo import MongoClient

client = MongoClient('localhost', 27017)
db = client[db_name]


# get file list
def log_list(log_folder):
    log_file = list()
    for entry in os.listdir(log_folder):
        if os.path.isfile(os.path.join(log_folder, entry)):
            log_path = os.path.join(log_folder, entry)
            log_file.append(log_path)
    return log_file


def func():
    collection = db[collection_name]
    print('loading folder_name')
    root = folder_path
    nfile = 0
    nrow = 0
    # insert data
    files = log_list(root)
    files.sort()
    for file in files:
        with open(file, 'r') as f:
            nfile += 1
            table = [json.loads(line) for line in f]
        for row in table:
            nrow += 1
            entry = {'timestamp': row['@timestamp'], 'user_id': row['user']['id'], 'action': row['@type']}
            collection.insert_one(entry).inserted_id
    client.close()
    print(nfile, 'file(s) processed.', nrow, 'row(s) loaded.')

标签: jsonpython-3.xmongodbmultiprocessingpymongo

解决方案


我们在我们的项目中这样做,用户为某些任务上传大量文件,我们使用 Celery 使用分布式任务队列来处理它。

由于这是一个类似的异步任务,'Celery' 在这里可以做得很好,它被设计为拾取任务,然后在单独的进程中执行。

  1. 创建任务
  2. 设置代理(如 redis)
  3. 在另一个终端或后台运行 celery
  4. 发送任务(参见 task_name.apply_async() 或 task_name.delay() )

https://docs.celeryproject.org/en/latest/index.html


推荐阅读