python - 使用 rq 处理带有蓝图和异步任务的循环导入
问题描述
我正在尝试使用rq
&redis
来处理异步任务。要求是任务会在后台做一些操作,并且会直接更新数据库。
到目前为止,只要没有进行数据库交互,我就能够完成所需的设置并且任务可以完美地工作。当我合并代码来触发数据库查询时,我得到了一个需要应用程序上下文的错误。现在,当我尝试使用它时,我开始收到循环导入错误。我已经在使用蓝图,但无法弄清楚修复它的方法。
我从这个样板开始,然后修改/添加了新文件,如下所示:-
应用程序/实用程序/worker.py
import os
import redis
from rq import Worker, Queue, Connection
listen = ['high', 'low', 'default']
redis_url = os.getenv('REDIS_URL', 'redis://localhost:6379/0')
conn = redis.from_url(redis_url)
if __name__ == '__main__':
with Connection(conn):
worker = Worker(map(Queue, listen))
worker.work()
应用程序/实用程序/tasks.py
import sys
import time
from flask import Blueprint
from app.auth.models import User
from app.config import Config
from app import db, create_app
task_bp = Blueprint('task_bp', __name__)
flask_app = create_app(Config)
flask_app.app_context().push()
def background_task():
print("Task Started")
user = User.query.get(1)
print(user)
time.sleep(10)
print("Task Completed")
应用程序/__init__.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_bcrypt import Bcrypt
from flask_login import LoginManager
from flask_mail import Mail
from flask_migrate import Migrate
from app.config import Config
from app.utils.worker import conn
from rq import Queue
import rq_dashboard
db = SQLAlchemy()
migrate = Migrate()
bcrypt = Bcrypt()
login_manager = LoginManager()
login_manager.login_view = 'auth_bp.login'
login_manager.login_message_category = 'info'
mail = Mail()
q = Queue(connection=conn)
def create_app(config_class=Config):
flask_app = Flask(__name__)
flask_app.config.from_object(Config)
db.init_app(flask_app)
migrate.init_app(flask_app,db)
bcrypt.init_app(flask_app)
login_manager.init_app(flask_app)
mail.init_app(flask_app)
from app.auth.routes import auth_bp
from app.main.routes import main_bp
from app.errors.handlers import errors_bp
from app.utils.tasks import task_bp
flask_app.register_blueprint(auth_bp)
flask_app.register_blueprint(main_bp)
flask_app.register_blueprint(errors_bp)
flask_app.register_blueprint(task_bp)
flask_app.config.from_object(rq_dashboard.default_settings)
flask_app.register_blueprint(rq_dashboard.blueprint, url_prefix="/rq")
return flask_app
应用程序/main/routes.py
from flask import render_template, request, Blueprint
from app import db, q
from app.utils.tasks import background_task
main_bp = Blueprint('main_bp', __name__)
@main_bp.route("/")
@main_bp.route("/home")
def home():
return render_template('home.html', title='Home')
@main_bp.route("/about")
def about():
q.enqueue(background_task)
return render_template('about.html', title='About')
错误 :-
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "./app/utils/tasks.py", line 11, in <module>
flask_app = create_app(Config)
File "./app/__init__.py", line 34, in create_app
from app.main.routes import main_bp
File "./app/main/routes.py", line 4, in <module>
from app.utils.tasks import background_task
ImportError: cannot import name 'background_task' from 'app.utils.tasks' (./app/utils/tasks.py)
感谢是否有人可以帮助我解决它并建议一些使用rq
和蓝图的最佳实践。
解决方案
我能够通过在任务函数体内移动导入语句来解决它。我不确定这是否是解决它的正确方法,或者是否有其他干净的解决方案。
import sys
import time
from app.auth.models import User
def background_task():
from app.config import Config
from app import db, create_app
flask_app = create_app(Config)
flask_app.app_context().push()
print("Task Started")
user = User.query.get(1)
print(user)
time.sleep(10)
print("Task Completed")
推荐阅读
- django-rest-framework - 如何序列化模型实例列表?
- c# - 在子路径上托管 Blazor 服务器
- python - 在 Subplot Python 中绘制对数刻度
- intellij-idea - 如何通过按住 Alt + 鼠标选择来使用列选择模式 Intellij IDE?
- python - 无法在 pycharm 上安装 pygame
- c++ - 从同一类的另一个成员中引用类成员对象是不好的做法吗?
- docker - Laradock,在 localhost 上使用 Nginx 配置 SSL
- r - 如何修复此错误:Recipes 无法在 Caret:: Train 中加载?
- python - 错误:需要 Microsoft Visual C++ 14.0。使用“Microsoft Visual C++ Build Tools”来安装 Python 模块
- c++ - Puzzle solving c++ code sometimes works and sometimes it doesn't