首页 > 解决方案 > 将数据流式传输到数据库以供 Flask 应用使用

问题描述

我对这项任务相当陌生,所以请帮助我确定我缺少的概念。

我正在尝试将数据从 API 流式传输到我的 SQLite 数据库,并让 Flask 应用程序使用数据。models.py我这样定义模型

# models.py
import os
from flask_sqlalchemy import SQLAlchemy

# Create sqlite db
db = SQLAlchemy()


class MyDataModel(db.Model):
    # Manual table name choice
    __tablename__ = 'table1'

    id = db.Column(db.Integer, primary_key=True)
    created_at = db.Column(db.Text)
    text = db.Column(db.Text)

    def __init__(self, created_at, text):
        self.created_at = created_at
        self.text = text

    def __repr__(self):
        return f"Data: {self.text} ... created at {self.created_at}"

app.py我有一个简单的视图函数,它计算行数并将服务器发送事件返回到前端以进行实时跟踪。

# app.py
import os
import time
from flask import Flask, render_template, url_for, redirect, Response
from flask_migrate import Migrate
from models import db, MyDataModel
from settings import *


app = Flask(__name__)
logging.basicConfig(level=logging.DEBUG)

basedir = os.path.abspath(os.path.dirname(__file__))

app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + os.path.join(basedir, 'data.sqlite')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db.init_app(app)
Migrate(app, db)

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/getcount')
def getcount():
    def count_stream():
        count = db.session.query(MyDataModel).count()
        while True:
            yield f"data:{str(count)}\n\n"
            time.sleep(0.5)
    return Response(count_stream(), mimetype='text/event-stream')


if __name__ == "__main__":
    app.run(debug=True, port=PORT)

现在我有另一个 python 脚本stream_to_db.py,它从 API 获取数据作为流,大约像这样

# stream_to_db.py
import logging
import os
from models import db, MyDataModel
from settings import *
from SomeExternalAPI import SomeAPI


def stream_to_db():
    api = SomeAPI(
        API_KEY, API_SECRET_KEY, ACCESS_TOKEN, ACCESS_TOKEN_SECRET
    )

    r = api.request()

    for item in r:
        created_at, text = item['created_at'], item['text']
        logging.info(text)
        datum = MyDataModel(created_at, text)
        db.session.add(datum)
        db.session.commit()

# Stream data to sqlite
stream_to_db()

当我尝试运行它时python stream_to_db.py出现错误

RuntimeError: No application found. Either work inside a view function or push an application context. See http://flask-sqlalchemy.pocoo.org/contexts/.

我查看了应用程序上下文的文档,但仍然感到困惑。假设我不使用 SQLAlchemy 并直接使用 Python 和 SQL 进行数据插入,这个stream_to_db.py脚本应该独立于 Flask 应用程序。但是,如果我仍然想利用 SQLAlchemy 的语法和模型定义models.py,我应该怎么做?

从概念上讲,我觉得流到 db 部分独立于 Flask 应用程序,应该是一个本质上是一个while True循环并且永远存在的脚本。Flask 应用程序只是读取数据库,将数据发送到前端,没有做任何其他事情。我尝试将stream_to_db()函数放入其中__main__app.py但这没有意义,app.run()而且stream_to_db()本质上都是while True循环,不能放在一起。

我迷路了,错过了这里的关键概念。请帮助并建议正确的方法/最佳实践。我觉得这是一项非常基本的任务,应该已经有了最佳实践和一组专用工具。提前致谢!


编辑

为了进一步实验,我导入appstream_to_db.py添加了

with app.app_context():
    stream_to_db()

现在我可以python stream_to_db.py毫无问题地运行,但是如果我同时启动 Flask 应用程序,我会得到几个

Debugging middleware caught exception in streamed response at a point where response headers were already sent.

并且

Traceback (most recent call last):
  File "/Users/<username>/webapps/<appname>/venv/lib/python3.6/site-packages/werkzeug/wsgi.py", line 507, in __next__
    return self._next()
  File "/Users/<username>/webapps/<appname>/venv/lib/python3.6/site-packages/werkzeug/wrappers/base_response.py", line 45, in _iter_encoded
    for item in iterable:
  File "/Users/<username>/webapps/<appname>/app.py", line 33, in count_stream
    count = db.session.query(CardanoTweet).count()
  File "/Users/<username>/webapps/<appname>/venv/lib/python3.6/site-packages/sqlalchemy/orm/scoping.py", line 162, in do
    return getattr(self.registry(), name)(*args, **kwargs)
  File "/Users/<username>/webapps/<appname>/venv/lib/python3.6/site-packages/sqlalchemy/util/_collections.py", line 1012, in __call__
    return self.registry.setdefault(key, self.createfunc())
  File "/Users/<username>/webapps/<appname>/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 3214, in __call__
    return self.class_(**local_kw)
  File "/Users/<username>/webapps/<appname>/venv/lib/python3.6/site-packages/flask_sqlalchemy/__init__.py", line 136, in __init__
    self.app = app = db.get_app()
  File "/Users/<username>/webapps/<appname>/venv/lib/python3.6/site-packages/flask_sqlalchemy/__init__.py", line 982, in get_app
    'No application found. Either work inside a view function or push'
RuntimeError: No application found. Either work inside a view function or push an application context. See http://flask-sqlalchemy.pocoo.org/contexts/.

看起来 SQLAlchemy 找不到应用程序。不确定是否是关于db.init_app(app)in app.py 和db = SQLAlchemy()in models.py 的问题。我避免db = SQLAlchemy(app)在 models.py 中使用,因为由于循环依赖的原因,我无法将应用程序导入模型。


编辑2

这次我将所有代码从 models.py 移到 app.py 并使用db = SQLAlchemy(app)、删除db.init_app(app)、将导入应用程序保留到 stream_to_db.py 和其中的应用程序上下文,它工作了!

我的问题


编辑3

感谢所有的答复。我不认为这与烧瓶 make_response with large files重复。

问题不在于将数据从 Flask 流式传输到客户端,而是将数据从外部 API 流式传输到 DB,并让 Flask 从 DB 中的数据中消耗一些统计信息。所以,我不明白为什么在概念上流式作业应该与 Flask 相关,它们是独立的。问题是我在数据模型和数据库事务的流式作业中使用 SQLAlchemy,而 SQLAlchemy 需要定义 Flask 应用程序。这部分是我感到困惑的地方。

使用 SQLAlchemy 定义的数据模型,编写这个将数据流式传输到 db 的不断运行的后台作业的正确方法是什么?我是否应该从流代码中删除 SQLAlchemy 代码并仅使用 SQL,并手动确保架构同意,如果有进一步的迁移?

标签: pythonsqliteflasksqlalchemy

解决方案


请尝试在您创建数据库的 models.py 中进行以下更改。db = SQLAlchemy(应用程序)


推荐阅读