python - 并行读取和顺序写入?
问题描述
我有以下代码可以id
按顺序读取和写入。
async def main():
while id < 1000:
data = await read_async(id)
await data.write_async(f'{id}.csv')
id += 1
read_async()
需要几分钟,write_async()
运行不到一分钟。现在我想
- 并行运行
read_async(id)
。但是,由于内存限制,最多可以并行运行 3 个调用。 write_async
必须按顺序运行,即write_async(n+1)
不能在write_async(n)
.
解决方案
您可以使用队列和固定数量的任务来读取和写入主任务。主要任务可以使用事件来找出读者可以使用新数据,并使用共享字典从他们那里获取数据。例如(未经测试):
async def reader(q, id_to_data, data_ready):
while True:
id = await q.get()
data = await read_async(id)
id_to_data[id] = data
data_ready.set()
async def main():
q = asyncio.Queue()
for id in range(1000):
await q.put(id)
id_to_data = {}
data_ready = asyncio.Event()
readers = [asyncio.create_task(reader(q, id_to_data, data_ready))
for _ in 3]
for id in range(1000):
while True:
# wait for the current ID to appear before writing
if id in id_to_data:
data = id_to_data.pop(id)
await data.write_async(f'{id}.csv')
break
# move on to the next ID
else:
# wait for new data and try again
await data_ready.wait()
data_ready.clear()
for r in readers:
r.cancel()
对结果使用单独的队列而不是事件是行不通的,因为队列是无序的。优先级队列会解决这个问题,它仍然会立即返回当前可用的最低 id,而编写器需要下一个id 才能按顺序处理所有 id。
推荐阅读
- go - 如何使用flask_restplus定义字典字段以用于使用swagger codegen生成的go代码?
- python - 连接池请求 Python
- spring-data-jpa - 春季分页不检索数据库的数据
- rest - Magento 2 - 自动产品信息传输(休息,肥皂?)
- pandoc - 将 Markdown 转换为 HTML
- css - 需要更长的图片描述
- mysql - 可以更改 mysql sgbd 上每个数据库的特定表吗?
- c++ - 将字符串数组传递给函数
- azure - 如何在 Azure Function c# 中使用正则表达式路由
- python - 根据几个条件替换熊猫数据框中的值