python-3.x - 块大小或超时异步python
问题描述
我试图找出一种简单的方法来解开队列,方法是获取指定chunk_size和timeout的块。
例如,我希望get_chunks
函数返回一个chunk_size项目的列表,如果它需要少于超时来获取它们,否则返回一个长度在 9 和chunk_size之间的列表。
这是到目前为止的代码:
import asyncio
async def populate(queue):
for i in range(0, 100):
await queue.put(i)
async def _get_chunks(queue, chunk_size):
items = []
for i in range(0, chunk_size):
items.append(await queue.get())
await asyncio.sleep(0.2)
return items
async def get_chunks(queue, chunk_size, timeout):
while True:
yield _get_chunks(queue, chunk_size)
async def listen():
queue = asyncio.Queue()
await populate(queue)
print(f'{queue.qsize()} items in queue')
async for chunk in get_chunks(queue, 10, 1):
print(await chunk)
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(listen())
if __name__ == '__main__':
main()
我认为有一种方法可以做到这asyncio.wait
一点:
done, not_done = asyncio.wait([_get_chunks(queue, size),
asyncio.sleep(timeout)],
return_when=asyncio.FIRST_COMPLETE)
items = done.pop().result()
但我无法在asyncio.sleep
第一次返回时得到结果。
解决方案
您无法获得结果,因为_get_chunks
尚未完成。一个简单的解决方法是在和它的调用者之间有一些共享状态_get_chunks
:
async def _get_chunks(queue, chunk_size, out):
for i in range(0, chunk_size):
out.append(await queue.get())
await asyncio.sleep(0.2)
然后你可以使用 实现超时wait_for
,它会自动取消超时协程:
items = []
try:
asyncio.wait_for(_get_chunk(queue, size, items))
except asyncio.TimeoutError:
pass
# items now contains the elements _get_chunk managed to extract
# from the queue within the alotted time
推荐阅读
- lua - 在 Lua 中,访问通过用户数据表寻址的 C 函数的评估值
- android - 在线性布局中使用权重后如何删除文本和视图之间的空间?
- python - GUI Tkinter 闹钟没有响应
- node.js - 筛选具有可以是或 true 或 null MongoDB 的属性的文档
- iis - Azure AD 登录问题仅在 IIS 服务器上运行应用程序时发生
- postgresql - 有没有比在 postgres 中使用的临时表更好的方法来从函数返回表(结果)?
- angular - 用于导航和 getCurrentNavigation 的 Angular Jasmine 测试用例不起作用
- swift - 编辑后保存 PDF 文档 - Swift、PDFKIT、Pencilkit
- django - django 数据库模型关系
- c++ - 数据不会在 Qt TexttEdit 小部件中实时反映