python - 将数据流式传输到数据库以供 Flask 应用使用
问题描述
我对这项任务相当陌生,所以请帮助我确定我缺少的概念。
我正在尝试将数据从 API 流式传输到我的 SQLite 数据库,并让 Flask 应用程序使用数据。models.py
我这样定义模型
# models.py
import os
from flask_sqlalchemy import SQLAlchemy
# Create sqlite db
db = SQLAlchemy()
class MyDataModel(db.Model):
# Manual table name choice
__tablename__ = 'table1'
id = db.Column(db.Integer, primary_key=True)
created_at = db.Column(db.Text)
text = db.Column(db.Text)
def __init__(self, created_at, text):
self.created_at = created_at
self.text = text
def __repr__(self):
return f"Data: {self.text} ... created at {self.created_at}"
在app.py
我有一个简单的视图函数,它计算行数并将服务器发送事件返回到前端以进行实时跟踪。
# app.py
import os
import time
from flask import Flask, render_template, url_for, redirect, Response
from flask_migrate import Migrate
from models import db, MyDataModel
from settings import *
app = Flask(__name__)
logging.basicConfig(level=logging.DEBUG)
basedir = os.path.abspath(os.path.dirname(__file__))
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + os.path.join(basedir, 'data.sqlite')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db.init_app(app)
Migrate(app, db)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/getcount')
def getcount():
def count_stream():
count = db.session.query(MyDataModel).count()
while True:
yield f"data:{str(count)}\n\n"
time.sleep(0.5)
return Response(count_stream(), mimetype='text/event-stream')
if __name__ == "__main__":
app.run(debug=True, port=PORT)
现在我有另一个 python 脚本stream_to_db.py
,它从 API 获取数据作为流,大约像这样
# stream_to_db.py
import logging
import os
from models import db, MyDataModel
from settings import *
from SomeExternalAPI import SomeAPI
def stream_to_db():
api = SomeAPI(
API_KEY, API_SECRET_KEY, ACCESS_TOKEN, ACCESS_TOKEN_SECRET
)
r = api.request()
for item in r:
created_at, text = item['created_at'], item['text']
logging.info(text)
datum = MyDataModel(created_at, text)
db.session.add(datum)
db.session.commit()
# Stream data to sqlite
stream_to_db()
当我尝试运行它时python stream_to_db.py
出现错误
RuntimeError: No application found. Either work inside a view function or push an application context. See http://flask-sqlalchemy.pocoo.org/contexts/.
我查看了应用程序上下文的文档,但仍然感到困惑。假设我不使用 SQLAlchemy 并直接使用 Python 和 SQL 进行数据插入,这个stream_to_db.py
脚本应该独立于 Flask 应用程序。但是,如果我仍然想利用 SQLAlchemy 的语法和模型定义models.py
,我应该怎么做?
从概念上讲,我觉得流到 db 部分独立于 Flask 应用程序,应该是一个本质上是一个while True
循环并且永远存在的脚本。Flask 应用程序只是读取数据库,将数据发送到前端,没有做任何其他事情。我尝试将stream_to_db()
函数放入其中__main__
,app.py
但这没有意义,app.run()
而且stream_to_db()
本质上都是while True
循环,不能放在一起。
我迷路了,错过了这里的关键概念。请帮助并建议正确的方法/最佳实践。我觉得这是一项非常基本的任务,应该已经有了最佳实践和一组专用工具。提前致谢!
编辑
为了进一步实验,我导入app
并stream_to_db.py
添加了
with app.app_context():
stream_to_db()
现在我可以python stream_to_db.py
毫无问题地运行,但是如果我同时启动 Flask 应用程序,我会得到几个
Debugging middleware caught exception in streamed response at a point where response headers were already sent.
并且
Traceback (most recent call last):
File "/Users/<username>/webapps/<appname>/venv/lib/python3.6/site-packages/werkzeug/wsgi.py", line 507, in __next__
return self._next()
File "/Users/<username>/webapps/<appname>/venv/lib/python3.6/site-packages/werkzeug/wrappers/base_response.py", line 45, in _iter_encoded
for item in iterable:
File "/Users/<username>/webapps/<appname>/app.py", line 33, in count_stream
count = db.session.query(CardanoTweet).count()
File "/Users/<username>/webapps/<appname>/venv/lib/python3.6/site-packages/sqlalchemy/orm/scoping.py", line 162, in do
return getattr(self.registry(), name)(*args, **kwargs)
File "/Users/<username>/webapps/<appname>/venv/lib/python3.6/site-packages/sqlalchemy/util/_collections.py", line 1012, in __call__
return self.registry.setdefault(key, self.createfunc())
File "/Users/<username>/webapps/<appname>/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 3214, in __call__
return self.class_(**local_kw)
File "/Users/<username>/webapps/<appname>/venv/lib/python3.6/site-packages/flask_sqlalchemy/__init__.py", line 136, in __init__
self.app = app = db.get_app()
File "/Users/<username>/webapps/<appname>/venv/lib/python3.6/site-packages/flask_sqlalchemy/__init__.py", line 982, in get_app
'No application found. Either work inside a view function or push'
RuntimeError: No application found. Either work inside a view function or push an application context. See http://flask-sqlalchemy.pocoo.org/contexts/.
看起来 SQLAlchemy 找不到应用程序。不确定是否是关于db.init_app(app)
in app.py 和db = SQLAlchemy()
in models.py 的问题。我避免db = SQLAlchemy(app)
在 models.py 中使用,因为由于循环依赖的原因,我无法将应用程序导入模型。
编辑2
这次我将所有代码从 models.py 移到 app.py 并使用db = SQLAlchemy(app)
、删除db.init_app(app)
、将导入应用程序保留到 stream_to_db.py 和其中的应用程序上下文,它工作了!
我的问题
- 如何正确地将模型定义移动到
models.py
没有循环依赖的位置? - 应用程序上下文的含义是什么
stream_to_db.py
?如果我让 gunicorn 运行许多工人,那么基本上会有多个 Flask 应用程序实例。这会导致问题吗?
编辑3
感谢所有的答复。我不认为这与烧瓶 make_response with large files重复。
问题不在于将数据从 Flask 流式传输到客户端,而是将数据从外部 API 流式传输到 DB,并让 Flask 从 DB 中的数据中消耗一些统计信息。所以,我不明白为什么在概念上流式作业应该与 Flask 相关,它们是独立的。问题是我在数据模型和数据库事务的流式作业中使用 SQLAlchemy,而 SQLAlchemy 需要定义 Flask 应用程序。这部分是我感到困惑的地方。
使用 SQLAlchemy 定义的数据模型,编写这个将数据流式传输到 db 的不断运行的后台作业的正确方法是什么?我是否应该从流代码中删除 SQLAlchemy 代码并仅使用 SQL,并手动确保架构同意,如果有进一步的迁移?
解决方案
请尝试在您创建数据库的 models.py 中进行以下更改。db = SQLAlchemy(应用程序)
推荐阅读
- python - 提高除数查找功能的性能
- javascript - 页面加载时如何在我的视图模型中执行功能
- sql - 如何找到两个表列之间的时间差?
- python - 如何使用 python 从这个 .mol2 文件中提取数据?
- php - Wordpress - 在后端隐藏特定用户角色的特定字段
- spring-boot - 使用spring boot向wildfly JMS消息队列发送消息
- php - .php 文件在本地主机上工作但不在网络服务器上
- nginx - 无法使用 Ingress SubPath 访问 GUI
- node.js - 如何发送包含标题、状态码和正文的现有对象?
- c - local_irq_disable 无法按预期工作