首页 > 解决方案 > 在 python 中实现类似 golang 的通道结构

问题描述

我试图channel在 python 中实现一个 golang。我使用 asyncio 包和一些简单的生成器来实现它。

import asyncio


def channel(capacity):
    # A very lazy channel implementation
    size = 0
    data = []
    while size < capacity:
        item = yield
        data.append(item)
        size += 1
    for item in data:
        yield item


async def worker(id:int, jobs:channel, results:channel):
    print(f"worker : {id} started")
    await asyncio.sleep(0)
    for job in jobs:
        print(f"worker : {id} received job {job}")
        result = job * job
        await asyncio.sleep(1)
        results.send(result)


async def main():
    jobs = channel(10)
    results = channel(10)
    # priming jobs and results channel
    next(jobs)
    next(results)
    for i in range(10):
        jobs.send(i)
    await worker(1, jobs, results)
    await worker(2, jobs, results)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

即使代码似乎运行,第二个工作人员也不会产生,直到第一个工作人员消耗所有工作。

我添加asyncio.sleep(0)了强制上下文切换,但第一个工作人员仍然没有将控制权返回给主协同程序。

我在这里做错了什么?跟本有关系channel generator吗?

结果 :

worker : 1 started
worker : 1 received job 1
worker : 1 received job 2
worker : 1 received job 3
worker : 1 received job 4
worker : 1 received job 5
worker : 1 received job 6
worker : 1 received job 7
worker : 1 received job 8
worker : 1 received job 9
worker : 2 started

标签: pythonpython-asyncio

解决方案


当你这样做时:

    await worker(1, jobs, results)

您正在等待这个协程结束,当它结束时,下一个协程被执行(一旦通道为空)。所以第一个工人消耗了所有的物品。

要解决这个问题,您必须同时运行两个协程。

使用collect,将所有协程转换为任务并等待所有协程完成:

    # with gather
    await asyncio.gather(
        worker(1, jobs, results),
        worker(2, jobs, results)
    )

使用任务(来自文档):

将 coro 协程包装成一个 Task 并安排其执行。返回任务对象。

然后等待任务。

    task_1 = asyncio.create_task(worker(1, jobs, results))
    task_2 = asyncio.create_task(worker(2, jobs, results))

    await task_1
    await task_2

也不要创建您的频道,将asyncio.Queue与 amaxsize作为频道一起使用。


推荐阅读