首页 > 解决方案 > 在python中使用多处理返回值

问题描述

背景

我现在有一些看起来像这样的代码。

failed_player_ids: Set[str] = set()
for player_id in player_ids:
    success = player_api.send_results(
        player_id, user=user, send_health_results=True
    )
    if not success:
        failed_player_ids.add(player_id)

此代码运行良好,但问题是每次调用需要 5 秒。每分钟有 2000 个呼叫的速率限制,所以我远远低于最大容量。我想并行化它以加快速度。这是我第一次multiprocessing在 python 中使用库,因此我对应该如何进行有点困惑。我可以用语言描述我想做什么。

在我当前的代码中,我循环遍历列表,player_id如果 api 响应成功,我什么也不做,如果失败,我记下该玩家 ID。

我不确定如何实现此代码的并行版本。我有一些想法,但我有点困惑。

这就是我到目前为止的想法

from multiprocessing import Pool


    
    num_processors_to_use = 5 # This is a number can be increased to get more speed
    
    def send_player_result(player_id_list: List[str]) -> Optional[str]:
        for player_id in player_id_list:
            success = player_api.send_results(player_id, user=user, send_health_results=True)
            if not success:
                return player_id
    # Caller
    with Pool(processes=num_processors_to_use) as pool:
            responses = pool.map(
                func=send_player_result,
                iterable=player_id_list,
            )
            failed_player_ids = Set(responses)

 

任何意见和建议都会有所帮助。

标签: pythonpython-multiprocessing

解决方案


如果您正在使用函数,则可迭代map的每个项目都将作为单独的任务传递给函数。因此,此函数不应再期望传递玩家 id 列表,而是传递单个玩家 id。而且,正如您现在所知道的,如果您的任务主要受 I/O 限制,那么多线程是一个更好的模型。您可以: player_id_listsend_player_result

from multiprocessing.dummy import Pool
# or
from multiprocessing.pool import ThreadPool

您可能希望大大增加线程数(但不大于 的大小player_id_list):

#from multiprocessing import Pool
from multiprocessing.dummy import Pool
from typing import Set

def send_player_result(player_id):
    success = player_api.send_results(player_id, user=user, send_health_results=True)
    return success

# Only required for Windows if you are doing multiprocessing:
if __name__ == '__main__':
    
    pool_size = 5 # This is a number can be increased to get more concurrency
    
    # Caller
    failed_player_ids: Set[str] = set()
    with Pool(pool_size) as pool:
        results = pool.map(func=send_player_result, iterable=player_id_list)
        for idx, success in enumerate(results):
            if not success:
                # failed for argument player_id_list[idx]:
                failed_player_ids.add(player_id_list[idx])
            

推荐阅读