python - 将多个 post 请求合并为一个,转换此批并返回这些 post 请求的答案
问题描述
我不擅长处理请求,但我目前的项目需要这个。现在我的服务器是这样工作的:
from aiohttp import web
@routes.post('/')
async def my_func(request):
post = await request.json()
answer = '... do something on GPU ...'
return web.json_response(answer)`
但是我想将多个请求合并为一个,并且只在 GPU 上执行一次我的功能。然后返回所有请求的响应(可能在循环中)。如果需要解决,我可以将 aoihttp 更改为不同的包。
例如,post 请求包含字段:{'id':1, 'data':'some data 1'}。
(1) 我要等待 5 个请求,合并数据以列出 ['some data 1', ..,'some data 5']
(2) 然后将我的函数应用于此列表(它返回我的答案列表 ['answer 1', ..,'answer 5']
(3) 然后我想对每个请求做出响应,例如 {'id':1, 'answers':'answer_1'}
我现在不知道如何实现步骤(1)和(3)。
解决方案
您可以保留(问题)和(答案)的缓存以及检查所述缓存的缓存;当问题缓存长度达到 5 时,您运行GPU 函数并填充答案缓存。
每个请求都会等待,直到答案缓存具有所需的数据。requests
responses
background task
服务器.py
import asyncio
from aiohttp import web
def gpu_func(items):
"""Send a batch of 5 questions to GPU and return the answers"""
answers = {}
for item in items:
answers[item["id"]] = "answer for data: " + item["data"]
return answers
async def gpu_loop(app):
"""Check questions cache continuously and when we have 5 questions process them and populate answers cache"""
while True:
if len(app.cache["questions"]) >= 5:
print("running GPU function")
answers = gpu_func(app.cache["questions"][:5])
print("got %d answers from GPU" % len(answers))
app.cache["answers"].update(answers)
app.cache["questions"] = app.cache["questions"][5:]
await asyncio.sleep(0.05) # sleep for 50ms
async def handle(request):
"""Main request handler: populate questions cache and wait for the answer to be available in the answers cache"""
data = await request.post()
print("got request with data ", data)
request.app.cache["questions"].append(data)
# can implement here a time limit using a counter (sleep_delay*counter = max time for request)
while True:
if data["id"] in request.app.cache["answers"]:
break
await asyncio.sleep(0.05)
answer = request.app.cache["answers"].pop(data["id"], "unknown")
return web.Response(text=answer)
# create background task (gpu_loop)
async def start_background_tasks(app):
app.gpu_loop = asyncio.create_task(gpu_loop(app))
# stop background task on shutdown
async def cleanup_background_tasks(app):
app.gpu_loop.cancel()
await app.gpu_loop
def main():
app = web.Application()
app.cache = {"questions": [], "answers": {}}
app.add_routes([web.post("/", handle)])
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
web.run_app(app)
if __name__ == "__main__":
main()
客户端.py
import aiohttp
import asyncio
async def make_request(session, num):
"""Make a single request using the existing session object and custom number"""
url = "http://127.0.01:8080"
data = {"id": num, "data": "question %d" % num}
response = await session.post(url, data=data)
text = await response.text()
return text
async def main():
"""Make 20 consecutive requests with a delay of 20 ms between them"""
tasks = []
session = aiohttp.ClientSession()
for i in range(20):
print("making request %d", i)
task = asyncio.ensure_future(make_request(session, i))
tasks.append(task)
await asyncio.sleep(0.02)
responses = await asyncio.gather(*tasks)
for response in responses:
print(response)
await session.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
根据并发性(从客户端发出请求的频率)和处理时间(处理一组请求需要多长时间),您可能需要调整时间(睡眠)和缓存限制(保留多少请求在缓存中)。
希望这能让你开始。
推荐阅读
- typescript - Visual Studio Code 不识别打字稿文件(*.ts 和 *.tx)
- java - 如何在没有注释的情况下实现自定义类级别的约束验证?
- python - tqdm 在循环开始时打印额外内容
- javascript - 如何在表单正文中获取(orderId)有效负载值以更新订单状态?
- html - 是否可以在下面一行中将一个 flex 子项对齐到另一个 flex 子项上方?
- jquery - 在 Wordpress 菜单中重置 URL
- ruby-on-rails - 如何使用 sidekiq 在 Rails 中的特定日期安排工作
- c++ - 如何阅读 [dcl.fct] 部分中的“函数类型”
- vb.net - 如何使用 ADO.NET 将我的 vb.net 项目连接到 mariadb?
- javascript - 为什么我无法捕获函数之外的错误?