python - 在 python 中实现类似 golang 的通道结构
问题描述
我试图channel
在 python 中实现一个 golang。我使用 asyncio 包和一些简单的生成器来实现它。
import asyncio
def channel(capacity):
# A very lazy channel implementation
size = 0
data = []
while size < capacity:
item = yield
data.append(item)
size += 1
for item in data:
yield item
async def worker(id:int, jobs:channel, results:channel):
print(f"worker : {id} started")
await asyncio.sleep(0)
for job in jobs:
print(f"worker : {id} received job {job}")
result = job * job
await asyncio.sleep(1)
results.send(result)
async def main():
jobs = channel(10)
results = channel(10)
# priming jobs and results channel
next(jobs)
next(results)
for i in range(10):
jobs.send(i)
await worker(1, jobs, results)
await worker(2, jobs, results)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
即使代码似乎运行,第二个工作人员也不会产生,直到第一个工作人员消耗所有工作。
我添加asyncio.sleep(0)
了强制上下文切换,但第一个工作人员仍然没有将控制权返回给主协同程序。
我在这里做错了什么?跟本有关系channel generator
吗?
结果 :
worker : 1 started
worker : 1 received job 1
worker : 1 received job 2
worker : 1 received job 3
worker : 1 received job 4
worker : 1 received job 5
worker : 1 received job 6
worker : 1 received job 7
worker : 1 received job 8
worker : 1 received job 9
worker : 2 started
解决方案
当你这样做时:
await worker(1, jobs, results)
您正在等待这个协程结束,当它结束时,下一个协程被执行(一旦通道为空)。所以第一个工人消耗了所有的物品。
要解决这个问题,您必须同时运行两个协程。
使用collect,将所有协程转换为任务并等待所有协程完成:
# with gather
await asyncio.gather(
worker(1, jobs, results),
worker(2, jobs, results)
)
使用任务(来自文档):
将 coro 协程包装成一个 Task 并安排其执行。返回任务对象。
然后等待任务。
task_1 = asyncio.create_task(worker(1, jobs, results))
task_2 = asyncio.create_task(worker(2, jobs, results))
await task_1
await task_2
也不要创建您的频道,将asyncio.Queue与 amaxsize
作为频道一起使用。
推荐阅读
- c++ - 根据 C++ 中的字符串输入将字节数组转换为相应的值
- sql-server - 在 WIN-PAK 4.7 MSSQL 数据库中解码二进制数据
- knex.js - upsert 语句中的列引用 xxx 不明确(候选人:excluded.xxx,table.xxx)
- c# - 如何以编程方式将生成的 resx 文件添加到我的 cproj
- xml - 从 xml 树迭代器返回结果
- c++11 - 相对变换的迭代乘法导致快速不稳定性
- xml - 如何将时间从 UTC 转换为 pom.xml 中的本地时间?
- javascript - 用于 App.vue 的 Main.js 调用 API 来填充值
- javascript - 如何将一个输入框中的值复制到另一个输入框中?
- node.js - 我应该在哪里存储 code_verifier(使用 PKCE 的 oauth 2.0 代码授权流程)