首页 > 解决方案 > 使用 python 多处理从 mongodb 读取和删除

问题描述

我有一个带有一个集合incoming和一个集合的 mongodb target。一个工作进程当前正在执行以下(简化):

def worker(number):
    incomings = db.incoming.find()
    buffersize=5
    readcounter=0
    for incoming in incomings:
        readcounter+=1
        documentToInsert={'keyfield':incoming["keyfield"], +other fields after some treatments...}
        documentsToInsert.append(incoming)
        documentToDelete={'_id':incoming["_id"]}
        documentsToDelete.append(documentToDelete)
        if readcounter >= readbuffer:
            readcounter=0
            db.incoming.remove({'_id': { '$in': [ObjectId(docs["_id"]) for docs in documentsToDelete]}})
            db.target.insert_many([docs for docs in documentsToInsert],ordered=False)

当然 remove 和 insert_many 语句都被try/except.

由于数据的输入速度比由/一个工作人员处理的速度快,因此我需要变得更快,例如通过在所有 cpu 上生成它,这无论如何都应该发生以提高效率。我通过以下代码做到这一点:

if __name__== "__main__":
    procs=[]
    number=0
    for cpu in range(multiprocessing.cpu_count()):
        procs.append(multiprocessing.Process(target = worker, args = (number,)))
        number+=1
    for proc in procs:
        proc.start()
    for proc in procs:
        proc.join()
    print("=====================FIN=========================")

问题是当一个线程正在读取buffersize文档时,其他线程获取相同的文档,导致只有一个线程成功插入target,其他线程产生重复键异常的困境。这种效果只使一个进程有用。如果没有多线程,remove/insert_many 组合工作正常,我可以轻松地使用更高的缓冲区大小。

我曾考虑将数据插入incoming一个额外的字段以使 worker 合格number,但这会占用额外的磁盘空间并消耗额外的处理,另外,在生成时,我不知道有多少 worker 将在数据。

我已经尝试在每个线程中随机休眠一个时间,但这完全无法预测,并且本身并不能防止错误。

我该怎么做才能让所有线程处理不同的数据?

标签: pythonmongodbmultithreadingpymongopython-multiprocessing

解决方案


根据我的评论,我认为使用 RabbitMQ 之类的消息代理最适合您的用例。使用 RabbitMQ 和类似的消息代理(我没有使用 0mq),您不需要提供其他线程,只需启动所需数量的线程,每个线程都订阅,然后代理将依次传递消息。


推荐阅读