python - 在python中使用多处理返回值
问题描述
背景
我现在有一些看起来像这样的代码。
failed_player_ids: Set[str] = set()
for player_id in player_ids:
success = player_api.send_results(
player_id, user=user, send_health_results=True
)
if not success:
failed_player_ids.add(player_id)
此代码运行良好,但问题是每次调用需要 5 秒。每分钟有 2000 个呼叫的速率限制,所以我远远低于最大容量。我想并行化它以加快速度。这是我第一次multiprocessing
在 python 中使用库,因此我对应该如何进行有点困惑。我可以用语言描述我想做什么。
在我当前的代码中,我循环遍历列表,player_id
如果 api 响应成功,我什么也不做,如果失败,我记下该玩家 ID。
我不确定如何实现此代码的并行版本。我有一些想法,但我有点困惑。
这就是我到目前为止的想法
from multiprocessing import Pool
num_processors_to_use = 5 # This is a number can be increased to get more speed
def send_player_result(player_id_list: List[str]) -> Optional[str]:
for player_id in player_id_list:
success = player_api.send_results(player_id, user=user, send_health_results=True)
if not success:
return player_id
# Caller
with Pool(processes=num_processors_to_use) as pool:
responses = pool.map(
func=send_player_result,
iterable=player_id_list,
)
failed_player_ids = Set(responses)
任何意见和建议都会有所帮助。
解决方案
如果您正在使用函数,则可迭代map
的每个项目都将作为单独的任务传递给函数。因此,此函数不应再期望传递玩家 id 列表,而是传递单个玩家 id。而且,正如您现在所知道的,如果您的任务主要受 I/O 限制,那么多线程是一个更好的模型。您可以: player_id_list
send_player_result
from multiprocessing.dummy import Pool
# or
from multiprocessing.pool import ThreadPool
您可能希望大大增加线程数(但不大于 的大小player_id_list
):
#from multiprocessing import Pool
from multiprocessing.dummy import Pool
from typing import Set
def send_player_result(player_id):
success = player_api.send_results(player_id, user=user, send_health_results=True)
return success
# Only required for Windows if you are doing multiprocessing:
if __name__ == '__main__':
pool_size = 5 # This is a number can be increased to get more concurrency
# Caller
failed_player_ids: Set[str] = set()
with Pool(pool_size) as pool:
results = pool.map(func=send_player_result, iterable=player_id_list)
for idx, success in enumerate(results):
if not success:
# failed for argument player_id_list[idx]:
failed_player_ids.add(player_id_list[idx])