首页 > 解决方案 > 如何在函数内使用 python 多处理创建多个进程?

问题描述

在我的 Web 应用程序(使用 Fast API 框架)中,我有 2 个 CPU 密集型函数需要创建最终响应。

这些功能不相关,因此我计划使用流程来加快工作速度。我为此编写了以下代码:

class SomeService:

    def __init__(self, ml_model1, ml_model2):
        self.ml_model1 = ml_model1
        self.ml_model2 = ml_model2

    def handle_request(self, request: QUSRequest):

        with concurrent.futures.ProcessPoolExecutor() as pool:
            futures = {pool.submit(self.some_cpu_intensive_task1, request): "some_cpu_intensive_task1",
                       pool.submit(self.some_cpu_intensive_task2,
                                   request): "some_cpu_intensive_task2"}
            response1 = None
            response2 = None
            for future in as_completed(futures):
                if futures[future] == "some_cpu_intensive_task1":
                    response1 = future.result()
                elif futures[future] == "some_cpu_intensive_task2":
                    response2 = future.result()

        response = APIResponse(response_a=response1, response_b = response2)
        
        return response

   def some_cpu_intensive_task1(request):

        ### some task
        return something;

   def some_cpu_intensive_task2(request):

        ### some task
        return something;

但是这个设置在我的应用程序中运行所有其他东西并且需要大量时间。

然而,一个没有进程的简单函数大约需要 6-7 毫秒。

如何在函数(handle_request)中编写进程?

标签: pythonpython-multiprocessing

解决方案


相对于创建进程池所需的时间而言,您的任务并不是特别长时间运行的some_cpu_intensive_task1函数。some_cpu_intensive_task2因此,如果handle_request没有多次调用函数来分摊在多次调用中创建进程池的成本,那么使用多处理将不会获得任何收益。但是,即使那样,您也必须确保只创建一次进程池并将其重用于所有handle_request调用。

在将参数和结果从一个进程的地址空间传递到另一个进程的地址空间以及从另一个进程的地址空间传递时仍然存在一些开销,如果您只是进行直接的函数调用,则不会产生这些开销,因此也减少了可能的性能提升。最重要的是,您的“工作者”函数执行的 CPU 越少,使用多处理获得的收益就越少。也就是说,您需要进行以下更改,以查看是否有可能通过多次调用获得任何收益(而不是损失):

import concurrent.futures

class SomeService:
    # Must be a class variable or we can get: TypeError: cannot pickle 'weakref' object
    # We only need 2 workers (assuming there are no concurrent calls to handle_request):
    _pool = concurrent.futures.ProcessPoolExecutor(max_workers=2)

    def handle_request(self, request):
        # Since we are not proceeding until both tasks complete,
        # nothing is really being gained by using as_completed:
        future1 = self._pool.submit(self.some_cpu_intensive_task1, request)
        future2 = self._pool.submit(self.some_cpu_intensive_task2, request)
        return APIResponse(response_a=future1.result(), response_b=future2.result())

    def some_cpu_intensive_task1(self, request):

        ### some task
        return something

    def some_cpu_intensive_task2(self, request):

        ### some task
        return something

更新

REPETITIONS这是一个设置为的具体示例10

import concurrent.futures

REPETITIONS = 10

class SomeService:
    # Must be a class variable or we can get: TypeError: cannot pickle 'weakref' object
    # We only need 2 workers (assuming there are no concurrent calls to handle_request):
    _pool = concurrent.futures.ProcessPoolExecutor(max_workers=2)

    def handle_request(self, request):
        # Since we are not proceeding until both tasks complete,
        # nothing is really being gained by using as_completed:
        future1 = self._pool.submit(self.some_cpu_intensive_task1, request)
        future2 = self._pool.submit(self.some_cpu_intensive_task2, request)
        return (future1.result(), future2.result())

    def some_cpu_intensive_task1(self, request):
        sum = 0
        for _ in range(REPETITIONS):
            sum += request ** 2
        return sum

    def some_cpu_intensive_task2(self, request):
        sum = 0
        for _ in range(REPETITIONS):
            sum += request ** 3
        return sum

if __name__ == '__main__':
    s = SomeService()
    import time
    t = time.time()
    for _ in range(100):
        result = s.handle_request(4)
    print('Multiprocessing:', time.time() - t, result)
    t = time.time()
    for _ in range(100):
        result = s.some_cpu_intensive_task1(4), s.some_cpu_intensive_task2(4)
    print('Serial processing:', time.time() - t, result)

印刷:

Multiprocessing: 0.21735835075378418 (160, 640)
Serial processing: 0.0010030269622802734 (160, 640)

由于将参数传递给另一个进程并从另一个进程取回结果的开销,多处理会降低性能。

REPETITIONS但是,当我们使用set to重新运行以100_000使 worker 运行some_cpu_intensive_task1some_cpu_intensive_task1花费更多时间来执行时,这是新的输出:

Multiprocessing: 2.8213891983032227 (1600000, 6400000)
Serial processing: 4.49717116355896 (1600000, 6400000)

推荐阅读