首页 > 解决方案 > 内部带有烧瓶或 aiohttp 服务器的异步应用程序

问题描述

我已经改进了我去年的应用程序。一开始有两个不同的 python 应用程序 - 第一个用于统计统计数据,第二个 - 带有 GET 请求的 Web 服务器 gunicorn+flask。(centos 中的两种服务)统计数据会计算并将所有内容存储在 Postgres 中。Web 服务器连接到该 Postgres 数据库并回答 GET 请求。

在重写版本中,我使用 pandas 框架进行了所有统计,现在我想将这两个应用程序合并为一个。我使用 asyncio 来获取数据和统计数据。一切正常,现在我要添加 Web 服务器来响应 GET。

部分代码:

import asyncio
from contextlib import closing
import db_cl, tks_db
from formdf_cl import FormatDF

getinfofromtks = tks_db.TKS() # Class object to connect to third party database
formatdf = FormatDF() # counting class object, that stores some data
dbb = db_cl.MyDatabase('mydb.ini') # Class object to connect to my database


async def get_some_data():
    # getting information from third party database every 5 seconds.
    await asyncio.sleep(5)
    ans_inc, ans_out = getinfofromtks.getdf()
    return ans_inc, ans_out # two huge dataframes in pandas


async def process(ans_inc, ans_out):
    # counting data on CPU
    await asyncio.sleep(0)
    formatdf.leftjoin(ans_inc, ans_out)
    # storing statistics in my Database
    dbb.query_database('INSERT INTO statistic (timestamp, outgoing, incoming, stats) values (%s, %s,%s, %s)',
                       formatdf.make_count())
    dbb.commit_query()


async def main():
    while True:
        ans_inc, ans_out = await get_some_data()  # blocking, get data from third party database
        asyncio.ensure_future(process(ans_inc, ans_out))  # computing



if __name__ == "__main__":
    with closing(asyncio.get_event_loop()) as event_loop:
        event_loop.run_until_complete(main())

现在我希望将 http 服务器添加为线程应用程序(使用 flask 或 aiohttp),它将使用来自类对象 "formatdf" 的参数来响应 GET 请求。包含这些功能的最佳方式是什么?

标签: pythonpython-3.xflaskpython-asyncio

解决方案


我设法添加了一个 http 服务器作为协程。首先我尝试使用 aiohttp,但最终我找到了 Quart(与 Flask 相同,但它使用 Asyncio)。在 Quart 上运行 http 服务器的示例代码:

import quart
from quart import request
import json
import time

app = quart.Quart(__name__)

def resp(code, data):
    return quart.Response(
        status=code,
        mimetype="application/json",
        response=to_json(data)
    )

def to_json(data):
    return json.dumps(data) + "\n"

@app.route('/')
def root():
    return quart.redirect('/api/status2')


@app.errorhandler(400)
def page_not_found(e):
    return resp(400, {})


@app.errorhandler(404)
def page_not_found(e):
    return resp(400, {})


@app.errorhandler(405)
def page_not_found(e):
    return resp(405, {})


@app.route('/api/status2', methods=['GET'])
def get_status():
    timestamp = request.args.get("timestamp")
    delay = request.args.get("delay")
    if timestamp:
        return resp(200, {"time": time.time()})
    elif delay:
        return resp(200, {"test is:": '1'})
    else:
        return resp(200, {"", "ask me about time"})


if __name__ == "__main__":
    app.run(debug=True, host='0.0.0.0', port=5000)

为了将此代码添加为协程,我使用await asyncio.gather()并使用了 app.run_task 而不是 app.run。从我的问题中更改了代码,如下所示:

async def launcher_main():
    while True:
        ans_inc, ans_out = await get_some_data()
        asyncio.ensure_future(process(ans_inc, ans_out))


async def main():
    await asyncio.gather(launcher_main(),
                         restapi_quart.app.run_task(debug=True, host='0.0.0.0', port=5000))

剩下的最后一个问题是将“formatdf”类对象中的参数提供给我的 http 服务器。我已经实现了Tests.restapi_quart.app.config["formatdf"] = formatdf向 process(...) 函数添加行。从夸脱中调用它:

elif button:
    return resp(200, {"ok": app.config["formatdf"].attr})

推荐阅读