python - 多处理中的 Python asyncio。每个进程一个事件循环
问题描述
我正在为我的团队编写一个函数,该函数将从云中下载一些数据。该函数本身是一个常规的 python 函数,但在底层,它使用 asyncio。因此,我在函数中创建了一个事件循环,并让异步协程同时进行下载。数据下载后,我对其进行处理并返回结果。
当我从任何其他 Python 函数调用它时,我的函数按预期工作。但是,当我尝试使用多处理并行化它时,我偶尔会看到一些 IOErrors。
我试图搜索有关如何实现此目的的示例,但找不到任何示例。我只看到使用 aconcurrent.futures
并让事件循环run_in_executor
进行并行化的建议。这对我来说不是一个选择,因为我想对我的团队隐藏所有异步的东西,只为他们提供这个简单的 Python 函数,他们可以从他们的代码中调用(可能在多处理中)。我在网上看到了关于为什么这是一个坏主意以及为什么我不应该隐藏异步内容的争论,但就我而言,我的团队不是精明的程序员。他们永远不会使用(或费心去理解)asyncio,所以一个简单的 python 函数对我们最有效。
最后,这是一个伪示例,显示了我正在尝试做的事情:
import asyncio
import aiohttp
from typing import List
async def _async_fetch_data(symbol: str) -> bytes:
'''
Download stock data for given symbol from yahoo finance.
'''
async with asyncio.BoundedSemaphore(50), aiohttp.ClientSession() as session:
try:
url = f'https://query1.finance.yahoo.com/v8/finance/chart/{symbol}?symbol={symbol}&period1=0&period2=9999999999&interval=1d'
async with session.get(url) as response:
return await response.read()
except:
return None
def fetch_data(symbols: List[str]) -> List[bytes]:
'''
Gateway function that wraps the under the hood async stuff
'''
coroutine_list = [_async_fetch_data(x) for x in symbols]
if len(coroutine_list) == 0:
return []
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
data = loop.run_until_complete(asyncio.wait(coroutine_list))[0]
loop.close()
return [d.result() for d in data if d.result() is not None]
如果我将其运行为
>>> data = fetch_data(['AAPL', 'GOOG'])
但我担心当我这样做时它是否会正常运行
>>> from multiprocessing import Pool as ProcessPool
>>> with ProcessPool(2) as pool:
data = [j for i in pool.map(fetch_data, [['AAPL', 'GOOG'], ['AMZN', 'MSFT']]) for j in i]
我偶尔会看到 IOErrors,但我无法重现它们,我不确定是因为我将 asyncio 与多处理混合在一起,还是因为其他原因。
解决方案
推荐阅读
- python - 提取子字符串并隐藏到新列中的浮点数
- blockchain - 是否可以在创建 Solidity 智能合约时保留一个地址用于征税?
- javascript - X3D:如何为 IndexedFaceSet 动态添加坐标到 DOM 中?
- google-cloud-platform - GCP 允许列出 GCS 对象但禁止下载
- typo3 - 如何使用 TYPO3 10 以 FluidTYPO3 助焊剂形式拥有多个带有图像的部分?
- python - 谁能告诉我为什么除了所需的数字之外它最后返回 None 值?
- python-3.x - 在装饰器中使用 ProcessPoolExecutor 酸洗 Python 函数失败
- dart - 合并多个 Uint8List.view
- javascript - Uncaught TypeError: $(...).fullCalendar is not a function 是当我尝试将日历引入引导模板时出现的问题
- javascript - 静态类比 Javascript 中的单例具有更好的内存性能吗?