python - 异步套接字服务器作为生产者,工作人员正在使用它
问题描述
我开始使用 asyncio 模块,我想知道是否可以构建一个 tcp 服务器,将一些工作放入队列中以便一些工作人员执行它。
我尝试从 python 文档中的示例合并代码。
import asyncio
async def handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")
print(f"Send: {message!r}")
writer.write(data)
await writer.drain()
print("Close the connection")
writer.close()
async def main():
server = await asyncio.start_server(
handle_echo, '127.0.0.1', 8888)
addr = server.sockets[0].getsockname()
print(f'Serving on {addr}')
async with server:
await server.serve_forever()
asyncio.run(main())
和工人
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())
刚开始写代码,很多问题就浮现在我的脑海里。
服务器是否创建自己的事件循环?
当工作人员从服务器填充的队列中消费作业时,我可以为服务器提供服务吗?
对于这种应用程序或引导人们了解异步带来的这些新术语是否有任何好的指南?
解决方案
我不太确定,但问题出在创建自己的事件循环的队列上,所以我必须在主异步函数中创建它。虽然start_serving
并serve_forever
没有任何区别。我仍在尝试和研究文档,所以我暂时接受这个答案。
from asyncio import Queue, create_task, gather, run, start_server
async def do_work(name: str, broker: Queue):
while True:
data = await broker.get()
print(f'worker `{name}` is consuming {data}')
broker.task_done()
async def main():
broker = Queue(maxsize=512)
async def handler(reader, writer):
data = await reader.read()
message = data.decode()
addr = writer.get_extra_info('peername')
print(f'Received {message!r} from {addr!r}')
print(f'Send: {message!r}')
writer.write(data)
await writer.drain()
print(f'Add work')
await broker.put(data)
print('Close the connection')
writer.close()
server = await start_server(handler, '127.0.0.1', 8888)
addr = server.sockets[0].getsockname()
print(f'Serving on {addr}')
# await server.start_serving()
workers = []
for i in range(3):
worker = create_task(do_work(f'worker-{i}', broker))
workers.append(worker)
# await gather(*workers)
async with server:
await server.serve_forever()
if __name__ == '__main__':
run(main())
推荐阅读
- arcgis - 无法为 WebGL 要素图层创建标签,不支持 esriGeometryPolyline
- apache-tomee - 如何使用jar文件区分TomEE不同版本?
- javascript - IF 语句将变量值带入开关的问题
- java - 我需要在java中初始化一个地图列表,但是我尝试过的每一种方式都将它初始化为null
- h2 - 如何像 sql server 一样在 h2 中使用 getDate()
- rtsp - 从 ffmpeg 到 rtsp 格式的 Mp4 视频
- java - 如何使用带有 Java 的 chrome 选项在 Selenoid 中下载文件
- php - 如何在 4 列中显示查询结果并在 4 列之后移动到另一行?
- java - 在 javax.annotation.processing.Processor 中获取 VariableElement 的封闭类名
- dart - Dart,Flutter:将 .jpg 文件上传到 Google Drive