python - 内部带有烧瓶或 aiohttp 服务器的异步应用程序
问题描述
我已经改进了我去年的应用程序。一开始有两个不同的 python 应用程序 - 第一个用于统计统计数据,第二个 - 带有 GET 请求的 Web 服务器 gunicorn+flask。(centos 中的两种服务)统计数据会计算并将所有内容存储在 Postgres 中。Web 服务器连接到该 Postgres 数据库并回答 GET 请求。
在重写版本中,我使用 pandas 框架进行了所有统计,现在我想将这两个应用程序合并为一个。我使用 asyncio 来获取数据和统计数据。一切正常,现在我要添加 Web 服务器来响应 GET。
部分代码:
import asyncio
from contextlib import closing
import db_cl, tks_db
from formdf_cl import FormatDF
getinfofromtks = tks_db.TKS() # Class object to connect to third party database
formatdf = FormatDF() # counting class object, that stores some data
dbb = db_cl.MyDatabase('mydb.ini') # Class object to connect to my database
async def get_some_data():
# getting information from third party database every 5 seconds.
await asyncio.sleep(5)
ans_inc, ans_out = getinfofromtks.getdf()
return ans_inc, ans_out # two huge dataframes in pandas
async def process(ans_inc, ans_out):
# counting data on CPU
await asyncio.sleep(0)
formatdf.leftjoin(ans_inc, ans_out)
# storing statistics in my Database
dbb.query_database('INSERT INTO statistic (timestamp, outgoing, incoming, stats) values (%s, %s,%s, %s)',
formatdf.make_count())
dbb.commit_query()
async def main():
while True:
ans_inc, ans_out = await get_some_data() # blocking, get data from third party database
asyncio.ensure_future(process(ans_inc, ans_out)) # computing
if __name__ == "__main__":
with closing(asyncio.get_event_loop()) as event_loop:
event_loop.run_until_complete(main())
现在我希望将 http 服务器添加为线程应用程序(使用 flask 或 aiohttp),它将使用来自类对象 "formatdf" 的参数来响应 GET 请求。包含这些功能的最佳方式是什么?
解决方案
我设法添加了一个 http 服务器作为协程。首先我尝试使用 aiohttp,但最终我找到了 Quart(与 Flask 相同,但它使用 Asyncio)。在 Quart 上运行 http 服务器的示例代码:
import quart
from quart import request
import json
import time
app = quart.Quart(__name__)
def resp(code, data):
return quart.Response(
status=code,
mimetype="application/json",
response=to_json(data)
)
def to_json(data):
return json.dumps(data) + "\n"
@app.route('/')
def root():
return quart.redirect('/api/status2')
@app.errorhandler(400)
def page_not_found(e):
return resp(400, {})
@app.errorhandler(404)
def page_not_found(e):
return resp(400, {})
@app.errorhandler(405)
def page_not_found(e):
return resp(405, {})
@app.route('/api/status2', methods=['GET'])
def get_status():
timestamp = request.args.get("timestamp")
delay = request.args.get("delay")
if timestamp:
return resp(200, {"time": time.time()})
elif delay:
return resp(200, {"test is:": '1'})
else:
return resp(200, {"", "ask me about time"})
if __name__ == "__main__":
app.run(debug=True, host='0.0.0.0', port=5000)
为了将此代码添加为协程,我使用await asyncio.gather()
并使用了 app.run_task 而不是 app.run。从我的问题中更改了代码,如下所示:
async def launcher_main():
while True:
ans_inc, ans_out = await get_some_data()
asyncio.ensure_future(process(ans_inc, ans_out))
async def main():
await asyncio.gather(launcher_main(),
restapi_quart.app.run_task(debug=True, host='0.0.0.0', port=5000))
剩下的最后一个问题是将“formatdf”类对象中的参数提供给我的 http 服务器。我已经实现了Tests.restapi_quart.app.config["formatdf"] = formatdf
向 process(...) 函数添加行。从夸脱中调用它:
elif button:
return resp(200, {"ok": app.config["formatdf"].attr})
推荐阅读
- css - 最小高度:如果父母只设置了最小高度而不是高度,则 100% 不起作用
- angular - Firestore - “缺少权限或权限不足”,但用户已登录且规则正常
- angular - 以角度方式升级到 msal v2 后,我收到此错误 crbug/1173575, non-JS module files deprecated
- r - rcpparmadillio 中的自导函数?
- c# - 等效于“Into”的 Linq 方法
- android - Android Kotlin 将数据从 NavController 传递到最后一个 Fragment
- python-3.x - 在 eltima com 端口上进行 Pyserial 读取
- jquery - 将php时间戳与Jquery进行比较
- javascript - 反应原生 - 无法水平居中我的滚动视图
- mysql - 我想找出查询哪个 group by 后跟 order by