python - 使用 asyncio 协程并行运行函数?
问题描述
我有以下代码从数据库(read_db
)读取数据并将数据写入镶木地板文件(data.to_parquet
)。两个 I/O 操作都需要一段时间才能运行。
def main():
while id < 1000:
logging.info(f'reading - id: {id}')
data = read_db(id) # returns a dataframe
logging.info(f'saving - id: {id}')
data.to_parquet(f'{id}.parquet')
logging.info(f'saved - id: {id}')
id += 1
它很慢,所以我想要read_db(n+1)
同时to_parquet(n)
运行。我需要保持id
按顺序完成的每个步骤(read_db(n+1)
需要在之后运行read_db(n)
并在之后data.to_parquet(n+1)
运行data.to_parquet(n)
。)。这是异步版本
def async_wrap(f):
@wraps(f)
async def run(*args, loop=None, executor=None, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()
p = partial(f, *args, **kwargs)
return await loop.run_in_executor(executor, p)
return run
async def main():
read_db_async = async_wrap(read_db)
while id < 1000:
logging.info(f'reading - id: {id}')
data = await read_db_async(id) # returns a dataframe
logging.info(f'saving - id: {id}')
to_parquet_async = async_wrap(data.to_parquet)
await data.to_parquet(f'{id}.parquet')
logging.info(f'saved - id: {id}')
id += 1
asyncio.get_event_loop().run_until_complete(main())
我除了看到一些乱序的日志:
reading - id: 1
saving - id: 1 (saving 1 and reading 2 run in parallel)
reading - id: 2
saved - id: 1
saving - id: 2
reading - id: 3
saved - id: 2
.....
但是,实际的日志和同步代码是一样的吗?
reading - id: 1
saving - id: 1
saved - id: 1
reading - id: 2
saving - id: 2
saved - id: 2
reading - id: 3
.....
解决方案
您可以使用或等效项同时创建read_db(n+1)
和运行:to_parquet(n)
gather
async def main():
read_db_async = async_wrap(read_db)
prev_to_parquet = asyncio.sleep(0) # no-op
for id in range(1, 1000):
data, _ = await asyncio.gather(read_db_async(id), prev_to_parquet)
to_parquet_async = async_wrap(data.to_parquet)
prev_to_parquet = to_parquet_async(f'{id}.parquet')
await prev_to_parquet
推荐阅读
- c# - 在多个绑定上托管 IIS 站点时如何获取正确的本地地址 URI?
- c# - 在单元测试中返回 FileContentResult 时,Http 响应标头解析器失败
- django - 跟踪哪个用户更新了任何模型的哪些字段?
- node.js - Raspberry Pi 上的 NPM 无法正常工作
- amazon-web-services - AWS 服务:使用 cloudwatch s3 put 事件触发批处理作业,文件 url 作为环境变量
- php - 为什么我的代码不能在 php 中使用 gettext 进行翻译?
- c# - Excel 文件的连接字符串的扩展属性的 Excel 部分
- scala - 如何将 DataFrame 的所有列(带有嵌套的 StructTypes 和嵌套的 ArrayType)转换为 Spark 中的字符串
- c# - 如何在列表中找到日期早于指定日期但所有数据类型都是字符串的条目?
- c# - “/”应用程序文件加载异常中的服务器错误