首页 > 解决方案 > 运行一个并行进程,保存 Python 中主进程的结果

问题描述

我有一个为任务列表创建一些结果的函数。我想将结果即时保存到 1) 释放内存,而不是保存到附加到 results_list 和 2) 在出现错误时获得第一部分的结果。

这是一个非常简短的示例代码:

for task in task_list:
    result = do_awesome_stuff_to_task(task)
    save_nice_results_to_db(result)  # Send this job to another process and let the main process continue

主进程有没有办法为 task_list 中的每个任务创建结果,并且每次创建结果时都将其发送到另一个处理器/线程以保存它,因此主循环可以继续而不等待缓慢的保存过程?

我看过多处理,但这似乎主要是为了加快 task_list 上的循环,而不是允许辅助子进程完成工作的其他部分。我也研究了 asyncio,但这似乎主要用于 I/O。

总而言之,我正在寻找一种方法让主进程在 task_list 上循环。对于完成的每个任务,我想将结果发送到另一个子流程以保存结果。请注意,do_awesome_stuff_to_task 比保存过程快得多,因此,在保存第一个任务之前,主循环将通过多个任务。我想了两种方法来解决这个问题:

  1. 使用多个子进程保存
  2. 保存每 xx 次迭代 - save_results 规模没问题,所以也许保存过程可以在主循环连续时一次保存 xx 次迭代?

这可能与Python有关吗?在哪里寻找以及需要考虑哪些关键因素?

感谢所有帮助。

标签: python-3.xmultiprocessingsubprocesspython-asyncio

解决方案


如果没有测试,很难知道在你的情况下什么会更快,但这里有一些关于如何选择做什么的想法。

如果save_nice_results_to_db因为将数据写入磁盘或网络而速度较慢,请确保您尚未达到硬件的最大写入速度。根据另一端的服务器,网络流量有时可以从一次打开多个端口进行读/写中受益匪浅,只要您保持在总网络传输速度(mac 接口和 ISP 的)范围内。SSD 可以从一次启动多个读/写中看到一些有限的好处,但太多会损害性能。当尝试一次做多件事情时,HDD 几乎普遍较慢。一次读取/写入更大的块更有效。

multiprocessing通常必须在父进程和子进程之间传输数据,pickle因为它们不共享内存。这有很高的开销,所以如果result是一个大对象,您可能会浪费更多的时间来发送数据到子进程的额外开销,而不是任何类型的并发所节省的时间。(强调五月总是为自己测试)。从 3.8开始,shared_memory添加的模块可能更高效,但灵活性和易用性要低得多。

threading受益于所有线程共享内存,因此在线程之间“发送”数据的传输开销为零。然而,由于 GIL(全局解释器锁),Python 线程无法同时执行字节码,因此无法利用多个 CPU 内核来提高计算速度。这是因为 python 本身有很多不是线程安全的部分。用 c 编写的特定函数可能会释放此锁以解决此问题并使用线程利用多个 cpu 内核,但是一旦执行返回到 python 解释器,该锁将再次被持有。通常涉及网络访问或文件 IO 的函数可以释放 GIL,因为解释器正在等待通常是线程安全的操作系统调用。其他流行的库(如 Numpy)也在努力释放 GIL,同时对大型数组进行复杂的数学运算。但是,您只能从 c/c++ 代码中释放 GIL,而不是从 python 本身。

asyncio这里应该特别提到,因为它是专门为并发网络/文件操作而设计的。它使用协程而不是线程(甚至比线程更低的开销,线程本身的开销比进程低得多)来排队一堆操作,然后使用操作系统调用来等待它们中的任何一个完成(事件循环)。使用它还需要您do_awesome_stuff_to_task在协程中发生,以便它与save_nice_results_to_db.

将每个result关闭触发到要处理的线程的一个简单示例:

for task in task_list:
    result = do_awesome_stuff_to_task(task)
    threading.Thread(target=save_nice_results_to_db, args=(result,)).start()  # Send this job to another process and let the main process continue

推荐阅读