首页 > 解决方案 > 并行读取和顺序写入?

问题描述

我有以下代码可以id按顺序读取和写入。

async def main():
    while id < 1000:
       data = await read_async(id) 
       await data.write_async(f'{id}.csv')
       id += 1
       

read_async()需要几分钟,write_async()运行不到一分钟。现在我想

  1. 并行运行read_async(id)。但是,由于内存限制,最多可以并行运行 3 个调用。
  2. write_async必须按顺序运行,即write_async(n+1)不能在write_async(n).

标签: pythonpython-3.xasynchronouspython-asyncio

解决方案


您可以使用队列和固定数量的任务来读取和写入主任务。主要任务可以使用事件来找出读者可以使用新数据,并使用共享字典从他们那里获取数据。例如(未经测试):

async def reader(q, id_to_data, data_ready):
    while True:
        id = await q.get()
        data = await read_async(id) 
        id_to_data[id] = data
        data_ready.set()

async def main():
    q = asyncio.Queue()
    for id in range(1000):
        await q.put(id)

    id_to_data = {}
    data_ready = asyncio.Event()
    readers = [asyncio.create_task(reader(q, id_to_data, data_ready))
               for _ in 3]

    for id in range(1000):
       while True:
           # wait for the current ID to appear before writing
           if id in id_to_data:
               data = id_to_data.pop(id)
               await data.write_async(f'{id}.csv')
               break
               # move on to the next ID
           else:
               # wait for new data and try again
               await data_ready.wait()
               data_ready.clear()

    for r in readers:
        r.cancel()

对结果使用单独的队列而不是事件是行不通的,因为队列是无序的。优先级队列会解决这个问题,它仍然会立即返回当前可用的最低 id,而编写器需要下一个id 才能按顺序处理所有 id。


推荐阅读