python - 如何在函数内使用 python 多处理创建多个进程?
问题描述
在我的 Web 应用程序(使用 Fast API 框架)中,我有 2 个 CPU 密集型函数需要创建最终响应。
这些功能不相关,因此我计划使用流程来加快工作速度。我为此编写了以下代码:
class SomeService:
def __init__(self, ml_model1, ml_model2):
self.ml_model1 = ml_model1
self.ml_model2 = ml_model2
def handle_request(self, request: QUSRequest):
with concurrent.futures.ProcessPoolExecutor() as pool:
futures = {pool.submit(self.some_cpu_intensive_task1, request): "some_cpu_intensive_task1",
pool.submit(self.some_cpu_intensive_task2,
request): "some_cpu_intensive_task2"}
response1 = None
response2 = None
for future in as_completed(futures):
if futures[future] == "some_cpu_intensive_task1":
response1 = future.result()
elif futures[future] == "some_cpu_intensive_task2":
response2 = future.result()
response = APIResponse(response_a=response1, response_b = response2)
return response
def some_cpu_intensive_task1(request):
### some task
return something;
def some_cpu_intensive_task2(request):
### some task
return something;
但是这个设置在我的应用程序中运行所有其他东西并且需要大量时间。
然而,一个没有进程的简单函数大约需要 6-7 毫秒。
如何在函数(handle_request)中编写进程?
解决方案
相对于创建进程池所需的时间而言,您的任务并不是特别长时间运行的some_cpu_intensive_task1
函数。some_cpu_intensive_task2
因此,如果handle_request
没有多次调用函数来分摊在多次调用中创建进程池的成本,那么使用多处理将不会获得任何收益。但是,即使那样,您也必须确保只创建一次进程池并将其重用于所有handle_request
调用。
在将参数和结果从一个进程的地址空间传递到另一个进程的地址空间以及从另一个进程的地址空间传递时仍然存在一些开销,如果您只是进行直接的函数调用,则不会产生这些开销,因此也减少了可能的性能提升。最重要的是,您的“工作者”函数执行的 CPU 越少,使用多处理获得的收益就越少。也就是说,您需要进行以下更改,以查看是否有可能通过多次调用获得任何收益(而不是损失):
import concurrent.futures
class SomeService:
# Must be a class variable or we can get: TypeError: cannot pickle 'weakref' object
# We only need 2 workers (assuming there are no concurrent calls to handle_request):
_pool = concurrent.futures.ProcessPoolExecutor(max_workers=2)
def handle_request(self, request):
# Since we are not proceeding until both tasks complete,
# nothing is really being gained by using as_completed:
future1 = self._pool.submit(self.some_cpu_intensive_task1, request)
future2 = self._pool.submit(self.some_cpu_intensive_task2, request)
return APIResponse(response_a=future1.result(), response_b=future2.result())
def some_cpu_intensive_task1(self, request):
### some task
return something
def some_cpu_intensive_task2(self, request):
### some task
return something
更新
REPETITIONS
这是一个设置为的具体示例10
:
import concurrent.futures
REPETITIONS = 10
class SomeService:
# Must be a class variable or we can get: TypeError: cannot pickle 'weakref' object
# We only need 2 workers (assuming there are no concurrent calls to handle_request):
_pool = concurrent.futures.ProcessPoolExecutor(max_workers=2)
def handle_request(self, request):
# Since we are not proceeding until both tasks complete,
# nothing is really being gained by using as_completed:
future1 = self._pool.submit(self.some_cpu_intensive_task1, request)
future2 = self._pool.submit(self.some_cpu_intensive_task2, request)
return (future1.result(), future2.result())
def some_cpu_intensive_task1(self, request):
sum = 0
for _ in range(REPETITIONS):
sum += request ** 2
return sum
def some_cpu_intensive_task2(self, request):
sum = 0
for _ in range(REPETITIONS):
sum += request ** 3
return sum
if __name__ == '__main__':
s = SomeService()
import time
t = time.time()
for _ in range(100):
result = s.handle_request(4)
print('Multiprocessing:', time.time() - t, result)
t = time.time()
for _ in range(100):
result = s.some_cpu_intensive_task1(4), s.some_cpu_intensive_task2(4)
print('Serial processing:', time.time() - t, result)
印刷:
Multiprocessing: 0.21735835075378418 (160, 640)
Serial processing: 0.0010030269622802734 (160, 640)
由于将参数传递给另一个进程并从另一个进程取回结果的开销,多处理会降低性能。
REPETITIONS
但是,当我们使用set to重新运行以100_000
使 worker 运行some_cpu_intensive_task1
并some_cpu_intensive_task1
花费更多时间来执行时,这是新的输出:
Multiprocessing: 2.8213891983032227 (1600000, 6400000)
Serial processing: 4.49717116355896 (1600000, 6400000)
推荐阅读
- python - 如何从熊猫输出中删除名称和数据类型
- shell - “sort -t . -k 1,1n -k 2,2n -k 3,3n -k 4,4n”是如何工作的?
- html - 调整大小时,HTML 部分应位于图像中间
- javascript - 什么样的 JS 对象是 [foo: 'bar']
- python - 需要在bash中传递python数组中的对象
- c# - C# 将变量 double 转换为浮点数
- android - AudioRecord 同时播放音频 - 访问输出播放数据
- git - 如何在检查先前的提交并对其进行更改后推送提交
- matlab - MATLAB - 使用 buffer() 分割向量的问题
- php - 在 Woocommerce 的任何页面上仅显示单个链接的产品类别文本