首页 > 解决方案 > 将多个 post 请求合并为一个,转换此批并返回这些 post 请求的答案

问题描述

我不擅长处理请求,但我目前的项目需要这个。现在我的服务器是这样工作的:

from aiohttp import web

@routes.post('/')
async def my_func(request):
    post = await request.json()
    answer = '... do something on GPU ...'
    return web.json_response(answer)`

但是我想将多个请求合并为一个,并且只在 GPU 上执行一次我的功能。然后返回所有请求的响应(可能在循环中)。如果需要解决,我可以将 aoihttp 更改为不同的包。

例如,post 请求包含字段:{'id':1, 'data':'some data 1'}。

(1) 我要等待 5 个请求,合并数据以列出 ['some data 1', ..,'some data 5']

(2) 然后将我的函数应用于此列表(它返回我的答案列表 ['answer 1', ..,'answer 5']

(3) 然后我想对每个请求做出响应,例如 {'id':1, 'answers':'answer_1'}

我现在不知道如何实现步骤(1)和(3)。

标签: pythonflaskrequestaiohttp

解决方案


您可以保留(问题)和(答案)的缓存以及检查所述缓存的缓存;当问题缓存长度达到 5 时,您运行GPU 函数并填充答案缓存。 每个请求都会等待,直到答案缓存具有所需的数据。requestsresponsesbackground task

服务器.py

import asyncio
from aiohttp import web


def gpu_func(items):
    """Send a batch of 5 questions to GPU and return the answers"""
    answers = {}
    for item in items:
        answers[item["id"]] = "answer for data: " + item["data"]
    return answers


async def gpu_loop(app):
    """Check questions cache continuously and when we have 5 questions process them and populate answers cache"""
    while True:
        if len(app.cache["questions"]) >= 5:
            print("running GPU function")
            answers = gpu_func(app.cache["questions"][:5])
            print("got %d answers from GPU" % len(answers))
            app.cache["answers"].update(answers)
            app.cache["questions"] = app.cache["questions"][5:]
        await asyncio.sleep(0.05)  # sleep for 50ms


async def handle(request):
    """Main request handler: populate questions cache and wait for the answer to be available in the answers cache"""
    data = await request.post()
    print("got request with data ", data)
    request.app.cache["questions"].append(data)
    # can implement here a time limit using a counter (sleep_delay*counter = max time for request)
    while True:
        if data["id"] in request.app.cache["answers"]:
            break
        await asyncio.sleep(0.05)
    answer = request.app.cache["answers"].pop(data["id"], "unknown")
    return web.Response(text=answer)


# create background task (gpu_loop)
async def start_background_tasks(app):
    app.gpu_loop = asyncio.create_task(gpu_loop(app))


# stop background task on shutdown
async def cleanup_background_tasks(app):
    app.gpu_loop.cancel()
    await app.gpu_loop


def main():
    app = web.Application()
    app.cache = {"questions": [], "answers": {}}
    app.add_routes([web.post("/", handle)])
    app.on_startup.append(start_background_tasks)
    app.on_cleanup.append(cleanup_background_tasks)
    web.run_app(app)


if __name__ == "__main__":
    main()

客户端.py

import aiohttp
import asyncio


async def make_request(session, num):
    """Make a single request using the existing session object and custom number"""
    url = "http://127.0.01:8080"
    data = {"id": num, "data": "question %d" % num}
    response = await session.post(url, data=data)
    text = await response.text()
    return text


async def main():
    """Make 20 consecutive requests with a delay of 20 ms between them"""
    tasks = []
    session = aiohttp.ClientSession()
    for i in range(20):
        print("making request %d", i)
        task = asyncio.ensure_future(make_request(session, i))
        tasks.append(task)
        await asyncio.sleep(0.02)
    responses = await asyncio.gather(*tasks)
    for response in responses:
        print(response)
    await session.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

根据并发性(从客户端发出请求的频率)和处理时间(处理一组请求需要多长时间),您可能需要调整时间(睡眠)和缓存限制(保留多少请求在缓存中)。

希望这能让你开始。


推荐阅读