python - 使用 python 多处理从 mongodb 读取和删除
问题描述
我有一个带有一个集合incoming
和一个集合的 mongodb target
。一个工作进程当前正在执行以下(简化):
def worker(number):
incomings = db.incoming.find()
buffersize=5
readcounter=0
for incoming in incomings:
readcounter+=1
documentToInsert={'keyfield':incoming["keyfield"], +other fields after some treatments...}
documentsToInsert.append(incoming)
documentToDelete={'_id':incoming["_id"]}
documentsToDelete.append(documentToDelete)
if readcounter >= readbuffer:
readcounter=0
db.incoming.remove({'_id': { '$in': [ObjectId(docs["_id"]) for docs in documentsToDelete]}})
db.target.insert_many([docs for docs in documentsToInsert],ordered=False)
当然 remove 和 insert_many 语句都被try/except
.
由于数据的输入速度比由/一个工作人员处理的速度快,因此我需要变得更快,例如通过在所有 cpu 上生成它,这无论如何都应该发生以提高效率。我通过以下代码做到这一点:
if __name__== "__main__":
procs=[]
number=0
for cpu in range(multiprocessing.cpu_count()):
procs.append(multiprocessing.Process(target = worker, args = (number,)))
number+=1
for proc in procs:
proc.start()
for proc in procs:
proc.join()
print("=====================FIN=========================")
问题是当一个线程正在读取buffersize
文档时,其他线程获取相同的文档,导致只有一个线程成功插入target
,其他线程产生重复键异常的困境。这种效果只使一个进程有用。如果没有多线程,remove/insert_many 组合工作正常,我可以轻松地使用更高的缓冲区大小。
我曾考虑将数据插入incoming
一个额外的字段以使 worker 合格number
,但这会占用额外的磁盘空间并消耗额外的处理,另外,在生成时,我不知道有多少 worker 将在数据。
我已经尝试在每个线程中随机休眠一个时间,但这完全无法预测,并且本身并不能防止错误。
我该怎么做才能让所有线程处理不同的数据?
解决方案
根据我的评论,我认为使用 RabbitMQ 之类的消息代理最适合您的用例。使用 RabbitMQ 和类似的消息代理(我没有使用 0mq),您不需要提供其他线程,只需启动所需数量的线程,每个线程都订阅,然后代理将依次传递消息。
推荐阅读
- slider - angular 5 , flickity 滑块
- aggregate - API网关微服务查找方法
- javascript - vanillaJS 代码不会生成动态 HTML 内容
- android - Xamarin Forms Android 按钮材质阴影
- ios - 在 swift 3 中使用 xib 使集合视图标题出列时崩溃?
- deployment - 在 Windows 10 上部署 UWP,但不在商店中
- javascript - 为 webpack 运行节点模块的 npm 脚本?
- selenium - 使用 docker 设置 selenium Grid 以运行 webdriverio 测试的问题
- javascript - 确定在 if 条件中使用的布尔表达式中哪个条件为真
- java - 下面的代码是在我每次单击按钮时显示随机颜色。但尽管已完全编译,但它无法正常工作。谢谢